Skip to content

API Reference

rigger

Rigger — a declarative agent harness framework.

Public API re-exports. All internal modules use a leading underscore; import from rigger directly for the stable surface.

Usage::

from rigger import Harness, Task, ClaudeCodeBackend

registry = _LazyRegistry() module-attribute

Module-level registry singleton. Pre-populated with all built-in implementations.

ComponentConfig dataclass

Configuration for a single protocol implementation.

Source code in src/rigger/_config.py
@dataclass
class ComponentConfig:
    """Configuration for a single protocol implementation."""

    type: str
    params: dict[str, Any] = field(default_factory=dict)

HarnessConfig dataclass

Parsed harness.yaml configuration.

Source code in src/rigger/_config.py
@dataclass
class HarnessConfig:
    """Parsed ``harness.yaml`` configuration."""

    backend: ComponentConfig
    task_source: ComponentConfig
    context_sources: list[ComponentConfig] = field(default_factory=list)
    verifiers: list[ComponentConfig] = field(default_factory=list)
    constraints: list[ComponentConfig] = field(default_factory=list)
    state_store: ComponentConfig | None = None
    entropy_detectors: list[ComponentConfig] = field(default_factory=list)
    workspace: ComponentConfig | None = None
    run: RunConfig = field(default_factory=RunConfig)
    config_dir: Path = field(default_factory=Path.cwd)

RunConfig dataclass

Loop control parameters from the run: YAML section.

Source code in src/rigger/_config.py
@dataclass
class RunConfig:
    """Loop control parameters from the ``run:`` YAML section."""

    max_epochs: int = 100
    max_retries: int = 3
    stop_when: str = "all_tasks_done"
    inject_entropy_tasks: bool = True

Callbacks dataclass

Hook points for the canonical loop.

Each callback returns Action | None. Returning None is equivalent to Action.CONTINUE.

Source code in src/rigger/_harness.py
@dataclass
class Callbacks:
    """Hook points for the canonical loop.

    Each callback returns ``Action | None``. Returning ``None`` is
    equivalent to ``Action.CONTINUE``.
    """

    on_epoch_start: Callable[[int, EpochState], Action | None] = field(
        default_factory=lambda: lambda epoch, state: None
    )
    on_task_selected: Callable[[Task], Action | None] = field(
        default_factory=lambda: lambda task: None
    )
    on_provision_complete: Callable[[ProvisionResult], Action | None] = field(
        default_factory=lambda: lambda result: None
    )
    on_task_complete: Callable[[TaskResult], Action | None] = field(
        default_factory=lambda: lambda result: None
    )
    on_verify_complete: Callable[[TaskResult, list[VerifyResult]], Action | None] = (
        field(default_factory=lambda: lambda result, vrs: None)
    )
    on_epoch_end: Callable[[int, EpochState], Action | None] = field(
        default_factory=lambda: lambda epoch, state: None
    )
    on_entropy_scan: Callable[[list[Task]], Action | None] = field(
        default_factory=lambda: lambda tasks: None
    )
    on_escalation: Callable[[Task, list[VerifyResult]], None] = field(
        default_factory=lambda: lambda task, results: None
    )

Harness

Orchestration loop composing dimension plugins + AgentBackend.

Constructor wires the plugins; run() executes the canonical loop; step methods are public for custom loops.

Source code in src/rigger/_harness.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
class Harness:
    """Orchestration loop composing dimension plugins + AgentBackend.

    Constructor wires the plugins; ``run()`` executes the canonical loop;
    step methods are public for custom loops.
    """

    def __init__(
        self,
        project_root: Path,
        backend: AgentBackend,
        task_source: TaskSource,
        context_sources: list[ContextSource] | None = None,
        verifiers: list[Verifier] | None = None,
        constraints: list[Constraint] | None = None,
        state_store: StateStore | None = None,
        entropy_detectors: list[EntropyDetector] | None = None,
        workspace_manager: WorkspaceManager | None = None,
        inject_entropy_tasks: bool = True,
    ) -> None:
        self.project_root = project_root
        self.backend = backend
        self.task_source = task_source
        self._provisioner = ContextProvisioner(sources=list(context_sources or []))
        self.verifiers: list[Verifier] = list(verifiers or [])
        self.constraints: list[Constraint] = list(constraints or [])
        self.state_store = state_store
        self.entropy_detectors: list[EntropyDetector] = list(entropy_detectors or [])
        self.workspace_manager = workspace_manager
        self.inject_entropy_tasks = inject_entropy_tasks
        self._instance_id = f"{os.getpid()}-{uuid.uuid4().hex[:8]}"

    # ─── Step methods ────────────────────────────────────────

    def load_state(self) -> EpochState:
        """Load state from StateStore if present, else from .harness/state.json."""
        if self.state_store is not None:
            return self.state_store.load(self.project_root)
        return read_state(self.project_root)

    def select_tasks(self, max_count: int = 1) -> list[Task]:
        """Delegate to TaskSource.pending(), sliced to max_count."""
        return self.task_source.pending(self.project_root)[:max_count]

    def provision(self) -> ProvisionResult:
        """Delegate to ContextProvisioner.provision()."""
        return self._provisioner.provision(self.project_root)

    def assign_task(self, task: Task) -> None:
        """Write .harness/current_task.json."""
        write_current_task(self.project_root, task)

    def check_constraints(self) -> list[VerifyResult]:
        """Call each Constraint.check() and return results."""
        return [c.check(self.project_root) for c in self.constraints]

    async def dispatch(self, task: Task) -> TaskResult:
        """Assign task then await backend.execute()."""
        self.assign_task(task)
        return await self.backend.execute(self.project_root)

    def verify(self, result: TaskResult) -> list[VerifyResult]:
        """Call each Verifier.verify() and return results."""
        return [v.verify(self.project_root, result) for v in self.verifiers]

    def persist(self, state: EpochState) -> None:
        """Write .harness/state.json and call StateStore.save() if present."""
        write_state(self.project_root, state)
        if self.state_store is not None:
            self.state_store.save(self.project_root, state)

    def harvest(self, result: TaskResult, project_root: Path) -> TaskResult:
        """Post-process a dispatch result before verification.

        Default implementation is passthrough. Override or replace for
        custom artifact collection, metric computation, or result enrichment.

        Called by ``run()`` and ``run_once()`` between dispatch and verify.

        Args:
            result: TaskResult from dispatch.
            project_root: The project directory.

        Returns:
            Possibly-modified TaskResult.
        """
        return result

    def scan_entropy(self) -> list[Task]:
        """Call each EntropyDetector.scan() and flatten results."""
        tasks: list[Task] = []
        for detector in self.entropy_detectors:
            tasks.extend(detector.scan(self.project_root))
        return tasks

    def write_constraints(
        self,
        project_root: Path | None = None,
        *,
        results: list[VerifyResult] | None = None,
    ) -> None:
        """Merge constraint metadata and write/delete .harness/constraints.json.

        Public step method for custom loops. If *results* are not provided,
        calls ``check_constraints()`` to obtain them.

        Args:
            project_root: Target directory (defaults to ``self.project_root``).
            results: Pre-computed constraint results. When ``None``,
                ``check_constraints()`` is called automatically.
        """
        root = project_root or self.project_root
        if results is not None:
            constraint_results = results
        else:
            constraint_results = self.check_constraints()
        merged = merge_metadata(constraint_results)
        _schema_write_constraints(root, merged)

    @staticmethod
    def _resolve_verify_action(verify_results: list[VerifyResult]) -> VerifyAction:
        """Determine aggregate action from multiple verification results.

        Precedence: BLOCK > ESCALATE > RETRY > ACCEPT.
        No results returns ACCEPT.
        """
        if not verify_results:
            return VerifyAction.ACCEPT
        actions = {vr.action for vr in verify_results}
        if VerifyAction.BLOCK in actions:
            return VerifyAction.BLOCK
        if VerifyAction.ESCALATE in actions:
            return VerifyAction.ESCALATE
        if VerifyAction.RETRY in actions:
            return VerifyAction.RETRY
        return VerifyAction.ACCEPT

    # ─── Parallel dispatch ────────────────────────────────────

    @staticmethod
    def _copy_provisioned_content(
        provision_results: list[ProvisionResult],
        source_root: Path,
        target_root: Path,
    ) -> None:
        """Copy provisioned files from source_root to matching paths in target_root."""
        for pr in provision_results:
            for f in pr.files:
                try:
                    rel = f.relative_to(source_root)
                except ValueError:
                    continue
                dest = target_root / rel
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(f, dest)

    async def dispatch_parallel(
        self,
        batch: list[Task],
        epoch_state: EpochState,
        callbacks: Callbacks | None = None,
        provision_results: list[ProvisionResult] | None = None,
    ) -> tuple[list[TaskResult], bool]:
        """Dispatch multiple tasks to parallel agents in isolated workspaces.

        Each task runs in its own workspace created by the WorkspaceManager.
        Uses ``asyncio.gather(return_exceptions=True)`` so all agents are
        independent — a failure in one does not cancel others.

        Args:
            batch: Tasks to execute in parallel.
            epoch_state: Current epoch state (for branch naming).
            callbacks: Optional hook points (on_task_complete checked per task).
            provision_results: Pre-computed provision results to copy into worktrees.

        Returns:
            Tuple of (results, halt_requested). Results are in the same order
            as the input batch. halt_requested is True if any callback returned HALT.

        Raises:
            RuntimeError: If no workspace_manager is configured.
        """
        if self.workspace_manager is None:
            msg = "dispatch_parallel requires a workspace_manager"
            raise RuntimeError(msg)

        cb = callbacks or Callbacks()
        wm = self.workspace_manager
        halt_requested = False
        results: list[TaskResult] = []
        worktrees: list[Path | None] = [None] * len(batch)

        # Phase 1: Create worktrees (sequential — filesystem setup)
        live_indices: list[int] = []
        for i, task in enumerate(batch):
            branch = f"rigger/{self._instance_id}/{epoch_state.epoch}/{task.id}"
            try:
                wt = await asyncio.to_thread(wm.create, self.project_root, task, branch)
                worktrees[i] = wt
                live_indices.append(i)
            except Exception as exc:
                logger.warning(
                    "Failed to create workspace for task %s: %s", task.id, exc
                )
                results.append(
                    TaskResult(
                        task_id=task.id,
                        status="error",
                        metadata={"error": str(exc)},
                    )
                )

        if not live_indices:
            return [r for r in results if r.task_id], halt_requested

        # Phase 2: Copy provisioned content into each worktree
        if provision_results:
            for i in live_indices:
                wt = worktrees[i]
                assert wt is not None  # noqa: S101
                self._copy_provisioned_content(provision_results, self.project_root, wt)

        # Phase 3: Write per-worktree .harness/ (task + state)
        for i in live_indices:
            wt = worktrees[i]
            assert wt is not None  # noqa: S101
            write_current_task(wt, batch[i])
            write_state(wt, epoch_state)

        # Phase 4: Dispatch agents via gather
        async def _execute_one(task: Task, worktree: Path) -> TaskResult:
            return await self.backend.execute(worktree)

        coros = [_execute_one(batch[i], worktrees[i]) for i in live_indices]  # type: ignore[arg-type]
        gather_results = await asyncio.gather(*coros, return_exceptions=True)

        # Build ordered results: merge with pre-existing error results
        live_results: list[TaskResult] = []
        for i, raw in zip(live_indices, gather_results, strict=True):
            task = batch[i]
            if isinstance(raw, BaseException):
                tr = TaskResult(
                    task_id=task.id,
                    status="error",
                    metadata={"error": str(raw)},
                )
            else:
                tr = raw
            live_results.append(tr)

            # Check callback
            action = cb.on_task_complete(tr)
            if action is Action.HALT:
                halt_requested = True

        # Phase 5: Verify per-worktree (on successful results)
        for i, tr in zip(live_indices, live_results, strict=True):
            wt = worktrees[i]
            assert wt is not None  # noqa: S101
            if tr.status != "error" and self.verifiers:
                verify_results = [v.verify(wt, tr) for v in self.verifiers]
                if all(vr.passed for vr in verify_results):
                    tr.status = "verified"
                elif any(not vr.passed for vr in verify_results):
                    tr.status = "verification_failed"

        # Phase 6: Merge sequentially (order matters — priority coupling)
        merge_results: list[MergeResult] = []
        for i, tr in zip(live_indices, live_results, strict=True):
            wt = worktrees[i]
            assert wt is not None  # noqa: S101
            if tr.status != "error":
                mr = await asyncio.to_thread(wm.merge, wt, self.project_root)
                merge_results.append(mr)
                if not mr.success:
                    tr.metadata["merge_conflicts"] = mr.conflicts
            else:
                merge_results.append(MergeResult(success=False, worktree_path=wt))

        # Phase 7: Post-merge constraints
        post_results = self.check_constraints()
        for r in post_results:
            if not r.passed:
                logger.info(
                    "dispatch_parallel: post-merge constraint violation: %s",
                    r.message,
                )

        # Phase 8: Cleanup (parallel with sync fallback)
        async def _cleanup_one(wt: Path) -> None:
            try:
                await asyncio.to_thread(wm.cleanup, wt)
            except asyncio.CancelledError:
                wm.cleanup(wt)

        cleanup_coros = [_cleanup_one(wt) for wt in worktrees if wt is not None]
        await asyncio.gather(*cleanup_coros, return_exceptions=True)

        # Combine results: error results from phase 1 + live results
        # Reconstruct in original batch order
        final: list[TaskResult] = []
        error_iter = iter(r for r in results)
        live_iter = iter(live_results)
        for i in range(len(batch)):
            if i in live_indices:
                final.append(next(live_iter))
            else:
                final.append(next(error_iter))

        return final, halt_requested

    # ─── Canonical loop ──────────────────────────────────────

    async def run(
        self,
        max_epochs: int = 1,
        max_retries: int = 3,
        stop_when: Callable[[EpochState], bool] = all_tasks_done,
        callbacks: Callbacks | None = None,
        *,
        force_lock: bool = False,
    ) -> EpochState:
        """Execute the canonical epoch loop.

        Single task per epoch with VerifyAction routing. Returns final
        EpochState. Acquires ``.harness/harness.lock`` for the duration
        of the run.

        Args:
            max_epochs: Maximum number of epochs to execute.
            max_retries: Maximum retries per task on RETRY action.
            stop_when: Predicate checked after each epoch; breaks if True.
            callbacks: Optional hook points for loop control.
            force_lock: Override an existing lock file.

        Returns:
            Final EpochState after loop terminates.

        Raises:
            HarnessAlreadyRunning: If another instance holds the lock.
        """
        lock_info = acquire_lock(self.project_root, force=force_lock)
        try:
            return await self._run_inner(
                max_epochs=max_epochs,
                max_retries=max_retries,
                stop_when=stop_when,
                callbacks=callbacks,
            )
        finally:
            release_lock(self.project_root, lock_info)

    async def _run_inner(
        self,
        max_epochs: int,
        max_retries: int,
        stop_when: Callable[[EpochState], bool],
        callbacks: Callbacks | None,
    ) -> EpochState:
        """Inner loop implementation for ``run()``."""
        cb = callbacks or Callbacks()
        epoch_state = EpochState()

        for epoch in range(1, max_epochs + 1):
            # READ_STATE
            epoch_state = self.load_state()
            epoch_state.epoch = epoch

            action = cb.on_epoch_start(epoch, epoch_state)
            if action is Action.HALT:
                break

            # SELECT_TASK
            tasks = self.select_tasks()
            if not tasks:
                break
            task = tasks[0]

            action = cb.on_task_selected(task)
            if action is Action.HALT:
                break
            if action is Action.SKIP_TASK:
                continue

            # PROVISION
            provision_result = self.provision()

            action = cb.on_provision_complete(provision_result)
            if action is Action.HALT:
                break
            if action is Action.SKIP_TASK:
                continue

            # CHECK_PRE
            pre_results = self.check_constraints()
            if any(not r.passed for r in pre_results):
                logger.warning(
                    "Epoch %d: pre-dispatch constraint check failed — skipping",
                    epoch,
                )
                continue

            # WRITE_CONFIG
            self.write_constraints(results=pre_results)

            # DISPATCH + VERIFY (with retry loop)
            retries = 0
            loop_halted = False

            while True:
                result = await self.dispatch(task)
                result = self.harvest(result, self.project_root)

                action = cb.on_task_complete(result)
                if action is Action.HALT:
                    loop_halted = True
                    break

                # CHECK_POST
                post_results = self.check_constraints()
                for r in post_results:
                    if not r.passed:
                        logger.info(
                            "Epoch %d: post-dispatch constraint violation: %s",
                            epoch,
                            r.message,
                        )

                # VERIFY
                verify_results = self.verify(result)
                verify_action = self._resolve_verify_action(verify_results)

                if verify_results and verify_action is VerifyAction.ACCEPT:
                    result.status = "verified"
                elif verify_results and verify_action is not VerifyAction.ACCEPT:
                    result.status = "verification_failed"

                action = cb.on_verify_complete(result, verify_results)
                if action is Action.HALT:
                    loop_halted = True
                    break

                # Route on VerifyAction
                if verify_action is VerifyAction.ACCEPT:
                    self.task_source.mark_complete(task.id, result)
                    epoch_state.completed_tasks.append(task.id)
                    if task.id in epoch_state.pending_tasks:
                        epoch_state.pending_tasks.remove(task.id)
                    break

                if verify_action is VerifyAction.RETRY:
                    retries += 1
                    if retries > max_retries:
                        logger.warning(
                            "Task %s: max retries (%d) exhausted — blocking",
                            task.id,
                            max_retries,
                        )
                        epoch_state.halted = True
                        epoch_state.halt_reason = "max_retries_exhausted"
                        loop_halted = True
                        break
                    logger.info(
                        "Task %s: retrying (%d/%d)",
                        task.id,
                        retries,
                        max_retries,
                    )
                    continue

                if verify_action is VerifyAction.BLOCK:
                    epoch_state.halted = True
                    epoch_state.halt_reason = "blocked"
                    loop_halted = True
                    break

                if verify_action is VerifyAction.ESCALATE:
                    epoch_state.halted = True
                    epoch_state.halt_reason = "escalation"
                    cb.on_escalation(task, verify_results)
                    loop_halted = True
                    break

            if loop_halted:
                self.persist(epoch_state)
                break

            # PERSIST
            self.persist(epoch_state)

            action = cb.on_epoch_end(epoch, epoch_state)
            if action is Action.HALT:
                break

            # ENTROPY
            entropy_tasks = self.scan_entropy()

            action = cb.on_entropy_scan(entropy_tasks)
            if action is Action.HALT:
                break

            if self.inject_entropy_tasks and entropy_tasks:
                write_entropy_tasks(self.project_root, entropy_tasks)

            # DECIDE
            epoch_state.pending_tasks = [
                t.id for t in self.task_source.pending(self.project_root)
            ]
            if stop_when(epoch_state):
                break

        return epoch_state

    # ─── Single-task mode ────────────────────────────────────

    async def run_once(
        self,
        task: Task,
        callbacks: Callbacks | None = None,
        *,
        force_lock: bool = False,
    ) -> TaskResult:
        """Single-task event-driven mode. No persist/entropy — caller manages state.

        Acquires ``.harness/harness.lock`` for the duration of the run.

        Args:
            task: The task to execute.
            callbacks: Optional hook points.
            force_lock: Override an existing lock file.

        Returns:
            TaskResult after dispatch and verification.

        Raises:
            HarnessAlreadyRunning: If another instance holds the lock.
        """
        lock_info = acquire_lock(self.project_root, force=force_lock)
        try:
            return await self._run_once_inner(task, callbacks)
        finally:
            release_lock(self.project_root, lock_info)

    async def _run_once_inner(
        self,
        task: Task,
        callbacks: Callbacks | None = None,
    ) -> TaskResult:
        """Inner implementation for ``run_once()``."""
        cb = callbacks or Callbacks()

        # PROVISION
        provision_result = self.provision()

        action = cb.on_provision_complete(provision_result)
        if action is Action.HALT:
            return TaskResult(task_id=task.id, status="halted")

        # CHECK_PRE
        pre_results = self.check_constraints()
        if any(not r.passed for r in pre_results):
            return TaskResult(task_id=task.id, status="constraint_violation")

        # WRITE_CONFIG
        self.write_constraints(results=pre_results)

        # DISPATCH
        result = await self.dispatch(task)

        # HARVEST
        result = self.harvest(result, self.project_root)

        # CHECK_POST
        post_results = self.check_constraints()
        for r in post_results:
            if not r.passed:
                logger.info(
                    "run_once: post-dispatch constraint violation: %s",
                    r.message,
                )

        # VERIFY
        verify_results = self.verify(result)
        if verify_results and all(vr.passed for vr in verify_results):
            result.status = "verified"
        elif verify_results and any(not vr.passed for vr in verify_results):
            result.status = "verification_failed"

        return result

    # ─── Sync convenience ────────────────────────────────────

    def run_sync(
        self,
        max_epochs: int = 1,
        max_retries: int = 3,
        stop_when: Callable[[EpochState], bool] = all_tasks_done,
        callbacks: Callbacks | None = None,
        *,
        force_lock: bool = False,
    ) -> EpochState:
        """Synchronous convenience wrapper around ``run()``.

        Args:
            max_epochs: Maximum number of epochs.
            max_retries: Maximum retries per task on RETRY action.
            stop_when: Stop predicate.
            callbacks: Optional hook points.
            force_lock: Override an existing lock file.

        Returns:
            Final EpochState.

        Raises:
            HarnessAlreadyRunning: If another instance holds the lock.
        """
        return asyncio.run(
            self.run(
                max_epochs=max_epochs,
                max_retries=max_retries,
                stop_when=stop_when,
                callbacks=callbacks,
                force_lock=force_lock,
            )
        )

load_state()

Load state from StateStore if present, else from .harness/state.json.

Source code in src/rigger/_harness.py
def load_state(self) -> EpochState:
    """Load state from StateStore if present, else from .harness/state.json."""
    if self.state_store is not None:
        return self.state_store.load(self.project_root)
    return read_state(self.project_root)

select_tasks(max_count=1)

Delegate to TaskSource.pending(), sliced to max_count.

Source code in src/rigger/_harness.py
def select_tasks(self, max_count: int = 1) -> list[Task]:
    """Delegate to TaskSource.pending(), sliced to max_count."""
    return self.task_source.pending(self.project_root)[:max_count]

provision()

Delegate to ContextProvisioner.provision().

Source code in src/rigger/_harness.py
def provision(self) -> ProvisionResult:
    """Delegate to ContextProvisioner.provision()."""
    return self._provisioner.provision(self.project_root)

assign_task(task)

Write .harness/current_task.json.

Source code in src/rigger/_harness.py
def assign_task(self, task: Task) -> None:
    """Write .harness/current_task.json."""
    write_current_task(self.project_root, task)

check_constraints()

Call each Constraint.check() and return results.

Source code in src/rigger/_harness.py
def check_constraints(self) -> list[VerifyResult]:
    """Call each Constraint.check() and return results."""
    return [c.check(self.project_root) for c in self.constraints]

dispatch(task) async

Assign task then await backend.execute().

Source code in src/rigger/_harness.py
async def dispatch(self, task: Task) -> TaskResult:
    """Assign task then await backend.execute()."""
    self.assign_task(task)
    return await self.backend.execute(self.project_root)

verify(result)

Call each Verifier.verify() and return results.

Source code in src/rigger/_harness.py
def verify(self, result: TaskResult) -> list[VerifyResult]:
    """Call each Verifier.verify() and return results."""
    return [v.verify(self.project_root, result) for v in self.verifiers]

persist(state)

Write .harness/state.json and call StateStore.save() if present.

Source code in src/rigger/_harness.py
def persist(self, state: EpochState) -> None:
    """Write .harness/state.json and call StateStore.save() if present."""
    write_state(self.project_root, state)
    if self.state_store is not None:
        self.state_store.save(self.project_root, state)

harvest(result, project_root)

Post-process a dispatch result before verification.

Default implementation is passthrough. Override or replace for custom artifact collection, metric computation, or result enrichment.

Called by run() and run_once() between dispatch and verify.

Parameters:

Name Type Description Default
result TaskResult

TaskResult from dispatch.

required
project_root Path

The project directory.

required

Returns:

Type Description
TaskResult

Possibly-modified TaskResult.

Source code in src/rigger/_harness.py
def harvest(self, result: TaskResult, project_root: Path) -> TaskResult:
    """Post-process a dispatch result before verification.

    Default implementation is passthrough. Override or replace for
    custom artifact collection, metric computation, or result enrichment.

    Called by ``run()`` and ``run_once()`` between dispatch and verify.

    Args:
        result: TaskResult from dispatch.
        project_root: The project directory.

    Returns:
        Possibly-modified TaskResult.
    """
    return result

scan_entropy()

Call each EntropyDetector.scan() and flatten results.

Source code in src/rigger/_harness.py
def scan_entropy(self) -> list[Task]:
    """Call each EntropyDetector.scan() and flatten results."""
    tasks: list[Task] = []
    for detector in self.entropy_detectors:
        tasks.extend(detector.scan(self.project_root))
    return tasks

write_constraints(project_root=None, *, results=None)

Merge constraint metadata and write/delete .harness/constraints.json.

Public step method for custom loops. If results are not provided, calls check_constraints() to obtain them.

Parameters:

Name Type Description Default
project_root Path | None

Target directory (defaults to self.project_root).

None
results list[VerifyResult] | None

Pre-computed constraint results. When None, check_constraints() is called automatically.

None
Source code in src/rigger/_harness.py
def write_constraints(
    self,
    project_root: Path | None = None,
    *,
    results: list[VerifyResult] | None = None,
) -> None:
    """Merge constraint metadata and write/delete .harness/constraints.json.

    Public step method for custom loops. If *results* are not provided,
    calls ``check_constraints()`` to obtain them.

    Args:
        project_root: Target directory (defaults to ``self.project_root``).
        results: Pre-computed constraint results. When ``None``,
            ``check_constraints()`` is called automatically.
    """
    root = project_root or self.project_root
    if results is not None:
        constraint_results = results
    else:
        constraint_results = self.check_constraints()
    merged = merge_metadata(constraint_results)
    _schema_write_constraints(root, merged)

dispatch_parallel(batch, epoch_state, callbacks=None, provision_results=None) async

Dispatch multiple tasks to parallel agents in isolated workspaces.

Each task runs in its own workspace created by the WorkspaceManager. Uses asyncio.gather(return_exceptions=True) so all agents are independent — a failure in one does not cancel others.

Parameters:

Name Type Description Default
batch list[Task]

Tasks to execute in parallel.

required
epoch_state EpochState

Current epoch state (for branch naming).

required
callbacks Callbacks | None

Optional hook points (on_task_complete checked per task).

None
provision_results list[ProvisionResult] | None

Pre-computed provision results to copy into worktrees.

None

Returns:

Type Description
list[TaskResult]

Tuple of (results, halt_requested). Results are in the same order

bool

as the input batch. halt_requested is True if any callback returned HALT.

Raises:

Type Description
RuntimeError

If no workspace_manager is configured.

Source code in src/rigger/_harness.py
async def dispatch_parallel(
    self,
    batch: list[Task],
    epoch_state: EpochState,
    callbacks: Callbacks | None = None,
    provision_results: list[ProvisionResult] | None = None,
) -> tuple[list[TaskResult], bool]:
    """Dispatch multiple tasks to parallel agents in isolated workspaces.

    Each task runs in its own workspace created by the WorkspaceManager.
    Uses ``asyncio.gather(return_exceptions=True)`` so all agents are
    independent — a failure in one does not cancel others.

    Args:
        batch: Tasks to execute in parallel.
        epoch_state: Current epoch state (for branch naming).
        callbacks: Optional hook points (on_task_complete checked per task).
        provision_results: Pre-computed provision results to copy into worktrees.

    Returns:
        Tuple of (results, halt_requested). Results are in the same order
        as the input batch. halt_requested is True if any callback returned HALT.

    Raises:
        RuntimeError: If no workspace_manager is configured.
    """
    if self.workspace_manager is None:
        msg = "dispatch_parallel requires a workspace_manager"
        raise RuntimeError(msg)

    cb = callbacks or Callbacks()
    wm = self.workspace_manager
    halt_requested = False
    results: list[TaskResult] = []
    worktrees: list[Path | None] = [None] * len(batch)

    # Phase 1: Create worktrees (sequential — filesystem setup)
    live_indices: list[int] = []
    for i, task in enumerate(batch):
        branch = f"rigger/{self._instance_id}/{epoch_state.epoch}/{task.id}"
        try:
            wt = await asyncio.to_thread(wm.create, self.project_root, task, branch)
            worktrees[i] = wt
            live_indices.append(i)
        except Exception as exc:
            logger.warning(
                "Failed to create workspace for task %s: %s", task.id, exc
            )
            results.append(
                TaskResult(
                    task_id=task.id,
                    status="error",
                    metadata={"error": str(exc)},
                )
            )

    if not live_indices:
        return [r for r in results if r.task_id], halt_requested

    # Phase 2: Copy provisioned content into each worktree
    if provision_results:
        for i in live_indices:
            wt = worktrees[i]
            assert wt is not None  # noqa: S101
            self._copy_provisioned_content(provision_results, self.project_root, wt)

    # Phase 3: Write per-worktree .harness/ (task + state)
    for i in live_indices:
        wt = worktrees[i]
        assert wt is not None  # noqa: S101
        write_current_task(wt, batch[i])
        write_state(wt, epoch_state)

    # Phase 4: Dispatch agents via gather
    async def _execute_one(task: Task, worktree: Path) -> TaskResult:
        return await self.backend.execute(worktree)

    coros = [_execute_one(batch[i], worktrees[i]) for i in live_indices]  # type: ignore[arg-type]
    gather_results = await asyncio.gather(*coros, return_exceptions=True)

    # Build ordered results: merge with pre-existing error results
    live_results: list[TaskResult] = []
    for i, raw in zip(live_indices, gather_results, strict=True):
        task = batch[i]
        if isinstance(raw, BaseException):
            tr = TaskResult(
                task_id=task.id,
                status="error",
                metadata={"error": str(raw)},
            )
        else:
            tr = raw
        live_results.append(tr)

        # Check callback
        action = cb.on_task_complete(tr)
        if action is Action.HALT:
            halt_requested = True

    # Phase 5: Verify per-worktree (on successful results)
    for i, tr in zip(live_indices, live_results, strict=True):
        wt = worktrees[i]
        assert wt is not None  # noqa: S101
        if tr.status != "error" and self.verifiers:
            verify_results = [v.verify(wt, tr) for v in self.verifiers]
            if all(vr.passed for vr in verify_results):
                tr.status = "verified"
            elif any(not vr.passed for vr in verify_results):
                tr.status = "verification_failed"

    # Phase 6: Merge sequentially (order matters — priority coupling)
    merge_results: list[MergeResult] = []
    for i, tr in zip(live_indices, live_results, strict=True):
        wt = worktrees[i]
        assert wt is not None  # noqa: S101
        if tr.status != "error":
            mr = await asyncio.to_thread(wm.merge, wt, self.project_root)
            merge_results.append(mr)
            if not mr.success:
                tr.metadata["merge_conflicts"] = mr.conflicts
        else:
            merge_results.append(MergeResult(success=False, worktree_path=wt))

    # Phase 7: Post-merge constraints
    post_results = self.check_constraints()
    for r in post_results:
        if not r.passed:
            logger.info(
                "dispatch_parallel: post-merge constraint violation: %s",
                r.message,
            )

    # Phase 8: Cleanup (parallel with sync fallback)
    async def _cleanup_one(wt: Path) -> None:
        try:
            await asyncio.to_thread(wm.cleanup, wt)
        except asyncio.CancelledError:
            wm.cleanup(wt)

    cleanup_coros = [_cleanup_one(wt) for wt in worktrees if wt is not None]
    await asyncio.gather(*cleanup_coros, return_exceptions=True)

    # Combine results: error results from phase 1 + live results
    # Reconstruct in original batch order
    final: list[TaskResult] = []
    error_iter = iter(r for r in results)
    live_iter = iter(live_results)
    for i in range(len(batch)):
        if i in live_indices:
            final.append(next(live_iter))
        else:
            final.append(next(error_iter))

    return final, halt_requested

run(max_epochs=1, max_retries=3, stop_when=all_tasks_done, callbacks=None, *, force_lock=False) async

Execute the canonical epoch loop.

Single task per epoch with VerifyAction routing. Returns final EpochState. Acquires .harness/harness.lock for the duration of the run.

Parameters:

Name Type Description Default
max_epochs int

Maximum number of epochs to execute.

1
max_retries int

Maximum retries per task on RETRY action.

3
stop_when Callable[[EpochState], bool]

Predicate checked after each epoch; breaks if True.

all_tasks_done
callbacks Callbacks | None

Optional hook points for loop control.

None
force_lock bool

Override an existing lock file.

False

Returns:

Type Description
EpochState

Final EpochState after loop terminates.

Raises:

Type Description
HarnessAlreadyRunning

If another instance holds the lock.

Source code in src/rigger/_harness.py
async def run(
    self,
    max_epochs: int = 1,
    max_retries: int = 3,
    stop_when: Callable[[EpochState], bool] = all_tasks_done,
    callbacks: Callbacks | None = None,
    *,
    force_lock: bool = False,
) -> EpochState:
    """Execute the canonical epoch loop.

    Single task per epoch with VerifyAction routing. Returns final
    EpochState. Acquires ``.harness/harness.lock`` for the duration
    of the run.

    Args:
        max_epochs: Maximum number of epochs to execute.
        max_retries: Maximum retries per task on RETRY action.
        stop_when: Predicate checked after each epoch; breaks if True.
        callbacks: Optional hook points for loop control.
        force_lock: Override an existing lock file.

    Returns:
        Final EpochState after loop terminates.

    Raises:
        HarnessAlreadyRunning: If another instance holds the lock.
    """
    lock_info = acquire_lock(self.project_root, force=force_lock)
    try:
        return await self._run_inner(
            max_epochs=max_epochs,
            max_retries=max_retries,
            stop_when=stop_when,
            callbacks=callbacks,
        )
    finally:
        release_lock(self.project_root, lock_info)

run_once(task, callbacks=None, *, force_lock=False) async

Single-task event-driven mode. No persist/entropy — caller manages state.

Acquires .harness/harness.lock for the duration of the run.

Parameters:

Name Type Description Default
task Task

The task to execute.

required
callbacks Callbacks | None

Optional hook points.

None
force_lock bool

Override an existing lock file.

False

Returns:

Type Description
TaskResult

TaskResult after dispatch and verification.

Raises:

Type Description
HarnessAlreadyRunning

If another instance holds the lock.

Source code in src/rigger/_harness.py
async def run_once(
    self,
    task: Task,
    callbacks: Callbacks | None = None,
    *,
    force_lock: bool = False,
) -> TaskResult:
    """Single-task event-driven mode. No persist/entropy — caller manages state.

    Acquires ``.harness/harness.lock`` for the duration of the run.

    Args:
        task: The task to execute.
        callbacks: Optional hook points.
        force_lock: Override an existing lock file.

    Returns:
        TaskResult after dispatch and verification.

    Raises:
        HarnessAlreadyRunning: If another instance holds the lock.
    """
    lock_info = acquire_lock(self.project_root, force=force_lock)
    try:
        return await self._run_once_inner(task, callbacks)
    finally:
        release_lock(self.project_root, lock_info)

run_sync(max_epochs=1, max_retries=3, stop_when=all_tasks_done, callbacks=None, *, force_lock=False)

Synchronous convenience wrapper around run().

Parameters:

Name Type Description Default
max_epochs int

Maximum number of epochs.

1
max_retries int

Maximum retries per task on RETRY action.

3
stop_when Callable[[EpochState], bool]

Stop predicate.

all_tasks_done
callbacks Callbacks | None

Optional hook points.

None
force_lock bool

Override an existing lock file.

False

Returns:

Type Description
EpochState

Final EpochState.

Raises:

Type Description
HarnessAlreadyRunning

If another instance holds the lock.

Source code in src/rigger/_harness.py
def run_sync(
    self,
    max_epochs: int = 1,
    max_retries: int = 3,
    stop_when: Callable[[EpochState], bool] = all_tasks_done,
    callbacks: Callbacks | None = None,
    *,
    force_lock: bool = False,
) -> EpochState:
    """Synchronous convenience wrapper around ``run()``.

    Args:
        max_epochs: Maximum number of epochs.
        max_retries: Maximum retries per task on RETRY action.
        stop_when: Stop predicate.
        callbacks: Optional hook points.
        force_lock: Override an existing lock file.

    Returns:
        Final EpochState.

    Raises:
        HarnessAlreadyRunning: If another instance holds the lock.
    """
    return asyncio.run(
        self.run(
            max_epochs=max_epochs,
            max_retries=max_retries,
            stop_when=stop_when,
            callbacks=callbacks,
            force_lock=force_lock,
        )
    )

HarnessAlreadyRunning

Bases: RuntimeError

Raised when another harness instance holds the lock.

Source code in src/rigger/_lock.py
class HarnessAlreadyRunning(RuntimeError):
    """Raised when another harness instance holds the lock."""

LockInfo dataclass

Metadata written to the lock file.

Source code in src/rigger/_lock.py
@dataclass(frozen=True)
class LockInfo:
    """Metadata written to the lock file."""

    pid: int
    timestamp: float
    instance_id: str
    hostname: str

AgentBackend

Bases: Protocol

The opaque coding agent — the BLACK BOX.

execute() takes ONLY project_root (Task 1.4, F5). The agent navigates the filesystem using its own tools. Task assignment and context files are already written to disk by TaskSource and ContextSource.

SDK CONFIGURATION (Task 1.17): - Backend constructor owns static configuration (model, MCP servers, permissions, system prompt, setting_sources). - REQUIRED: setting_sources=["project"] must be set for filesystem- provisioned context (AGENTS.md, CLAUDE.md) to be loaded by the SDK. - Dynamic per-epoch configuration (tool restrictions, iteration limits) flows through .harness/constraints.json, which the backend reads inside execute().

LIFECYCLE (Task 5.1): - MCP server lifecycle is owned by the backend constructor. - Per-harness infrastructure uses Python context managers. - Per-worktree infrastructure uses WorkspaceManager decorators.

Source code in src/rigger/_protocols.py
class AgentBackend(Protocol):
    """The opaque coding agent — the BLACK BOX.

    execute() takes ONLY project_root (Task 1.4, F5). The agent navigates
    the filesystem using its own tools. Task assignment and context files
    are already written to disk by TaskSource and ContextSource.

    SDK CONFIGURATION (Task 1.17):
    - Backend constructor owns static configuration (model, MCP servers,
      permissions, system prompt, setting_sources).
    - REQUIRED: setting_sources=["project"] must be set for filesystem-
      provisioned context (AGENTS.md, CLAUDE.md) to be loaded by the SDK.
    - Dynamic per-epoch configuration (tool restrictions, iteration limits)
      flows through .harness/constraints.json, which the backend reads
      inside execute().

    LIFECYCLE (Task 5.1):
    - MCP server lifecycle is owned by the backend constructor.
    - Per-harness infrastructure uses Python context managers.
    - Per-worktree infrastructure uses WorkspaceManager decorators.
    """

    async def execute(self, project_root: Path) -> TaskResult:
        """Execute in a fresh context. The agent navigates the environment.

        ASYNC (Task 1.15): The Claude Agents SDK is natively async.
        This call may take minutes to hours. The async contract frees
        the event loop for parallel dispatch, health checks, and other
        concurrent work.

        The backend creates a fresh SDK client per call, reading the
        latest .harness/constraints.json for dynamic configuration.
        """
        ...

execute(project_root) async

Execute in a fresh context. The agent navigates the environment.

ASYNC (Task 1.15): The Claude Agents SDK is natively async. This call may take minutes to hours. The async contract frees the event loop for parallel dispatch, health checks, and other concurrent work.

The backend creates a fresh SDK client per call, reading the latest .harness/constraints.json for dynamic configuration.

Source code in src/rigger/_protocols.py
async def execute(self, project_root: Path) -> TaskResult:
    """Execute in a fresh context. The agent navigates the environment.

    ASYNC (Task 1.15): The Claude Agents SDK is natively async.
    This call may take minutes to hours. The async contract frees
    the event loop for parallel dispatch, health checks, and other
    concurrent work.

    The backend creates a fresh SDK client per call, reading the
    latest .harness/constraints.json for dynamic configuration.
    """
    ...

Constraint

Bases: Protocol

AC: Enforces architectural invariants.

Constraints run PRE-dispatch (CHECK_PRE) and POST-dispatch (CHECK_POST). Pre-dispatch constraint results with metadata are merged and written to .harness/constraints.json for the backend to read (Task 1.17).

Only Constraints produce metadata for .harness/constraints.json. Verifiers do NOT — they influence the NEXT epoch via StateStore, not the current epoch's backend configuration (Task 1.26 temporal model).

Source code in src/rigger/_protocols.py
class Constraint(Protocol):
    """AC: Enforces architectural invariants.

    Constraints run PRE-dispatch (CHECK_PRE) and POST-dispatch (CHECK_POST).
    Pre-dispatch constraint results with metadata are merged and written to
    .harness/constraints.json for the backend to read (Task 1.17).

    Only Constraints produce metadata for .harness/constraints.json. Verifiers
    do NOT — they influence the NEXT epoch via StateStore, not the current
    epoch's backend configuration (Task 1.26 temporal model).
    """

    def check(self, project_root: Path) -> VerifyResult:
        """Check whether constraints are satisfied."""
        ...

check(project_root)

Check whether constraints are satisfied.

Source code in src/rigger/_protocols.py
def check(self, project_root: Path) -> VerifyResult:
    """Check whether constraints are satisfied."""
    ...

ContextSource

Bases: Protocol

CP: Prepares one aspect of the agent's information environment.

A ContextSource discovers, verifies, or creates filesystem artifacts that provide context to the coding agent. The agent finds and reads these artifacts during execution; the ContextSource ensures they exist.

DELIVERY BOUNDARY (Task 1.18, formalized in Task 5.11): ContextSource.gather() provisions DISCOVERABLE content — artifacts that the agent MAY find and read through its own tools during execution. It does NOT provision GUARANTEED-IN-PROMPT content.

The guarantee of prompt inclusion is a delivery concern owned by: - AgentBackend: for always-loaded files (CLAUDE.md, AGENTS.md) via backend-specific mechanisms (e.g., setting_sources=["project"]) - TaskSource: for task specifications that may include embedded domain context (Task.description dual-use, Task 1.18 Section 5)

FILESYSTEM SCOPE (Task 5.11): gather() writes to project_root/ (or subdirectories thereof). It MUST NOT write to .harness/ — that directory is Rigger-managed and follows a versioned bilateral protocol (Task 5.3).

LIFECYCLE: This protocol has NO lifecycle hooks (Task 5.1 decision). Setup and teardown of infrastructure is handled by: - Python context managers for per-harness scope - WorkspaceManager decorators for per-worktree scope

Single-method protocol validated in Task 1.1.

Source code in src/rigger/_protocols.py
class ContextSource(Protocol):
    """CP: Prepares one aspect of the agent's information environment.

    A ContextSource discovers, verifies, or creates filesystem artifacts
    that provide context to the coding agent. The agent finds and reads
    these artifacts during execution; the ContextSource ensures they exist.

    DELIVERY BOUNDARY (Task 1.18, formalized in Task 5.11):
    ContextSource.gather() provisions DISCOVERABLE content — artifacts
    that the agent MAY find and read through its own tools during
    execution. It does NOT provision GUARANTEED-IN-PROMPT content.

    The guarantee of prompt inclusion is a delivery concern owned by:
    - AgentBackend: for always-loaded files (CLAUDE.md, AGENTS.md)
      via backend-specific mechanisms (e.g., setting_sources=["project"])
    - TaskSource: for task specifications that may include embedded
      domain context (Task.description dual-use, Task 1.18 Section 5)

    FILESYSTEM SCOPE (Task 5.11):
    gather() writes to project_root/ (or subdirectories thereof).
    It MUST NOT write to .harness/ — that directory is Rigger-managed
    and follows a versioned bilateral protocol (Task 5.3).

    LIFECYCLE: This protocol has NO lifecycle hooks (Task 5.1 decision).
    Setup and teardown of infrastructure is handled by:
    - Python context managers for per-harness scope
    - WorkspaceManager decorators for per-worktree scope

    Single-method protocol validated in Task 1.1.
    """

    def gather(self, project_root: Path) -> ProvisionResult:
        """Ensure source files are ready and return what was prepared.

        The returned files are DISCOVERABLE — the agent may find and read
        them via its own tools. There is no guarantee that any specific
        file will appear in the agent's prompt.

        Args:
            project_root: The project directory (main root or worktree).

        Returns:
            ProvisionResult with discovered files and capability metadata.
        """
        ...

gather(project_root)

Ensure source files are ready and return what was prepared.

The returned files are DISCOVERABLE — the agent may find and read them via its own tools. There is no guarantee that any specific file will appear in the agent's prompt.

Parameters:

Name Type Description Default
project_root Path

The project directory (main root or worktree).

required

Returns:

Type Description
ProvisionResult

ProvisionResult with discovered files and capability metadata.

Source code in src/rigger/_protocols.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Ensure source files are ready and return what was prepared.

    The returned files are DISCOVERABLE — the agent may find and read
    them via its own tools. There is no guarantee that any specific
    file will appear in the agent's prompt.

    Args:
        project_root: The project directory (main root or worktree).

    Returns:
        ProvisionResult with discovered files and capability metadata.
    """
    ...

EntropyDetector

Bases: Protocol

EM: Detects drift, degradation, and entropy.

Source code in src/rigger/_protocols.py
class EntropyDetector(Protocol):
    """EM: Detects drift, degradation, and entropy."""

    def scan(self, project_root: Path) -> list[Task]:
        """Scan for entropy issues and return remediation tasks."""
        ...

scan(project_root)

Scan for entropy issues and return remediation tasks.

Source code in src/rigger/_protocols.py
def scan(self, project_root: Path) -> list[Task]:
    """Scan for entropy issues and return remediation tasks."""
    ...

StateStore

Bases: Protocol

SC: Persists and restores state across epochs.

Source code in src/rigger/_protocols.py
class StateStore(Protocol):
    """SC: Persists and restores state across epochs."""

    def load(self, project_root: Path) -> EpochState:
        """Load externalized state at epoch start."""
        ...

    def save(self, project_root: Path, state: EpochState) -> None:
        """Persist state at epoch end."""
        ...

load(project_root)

Load externalized state at epoch start.

Source code in src/rigger/_protocols.py
def load(self, project_root: Path) -> EpochState:
    """Load externalized state at epoch start."""
    ...

save(project_root, state)

Persist state at epoch end.

Source code in src/rigger/_protocols.py
def save(self, project_root: Path, state: EpochState) -> None:
    """Persist state at epoch end."""
    ...

TaskSource

Bases: Protocol

TD: Provides the next task(s) to execute.

Source code in src/rigger/_protocols.py
class TaskSource(Protocol):
    """TD: Provides the next task(s) to execute."""

    def pending(self, project_root: Path) -> list[Task]:
        """Return pending tasks in priority order."""
        ...

    def mark_complete(self, task_id: str, result: TaskResult) -> None:
        """Update task status after execution."""
        ...

pending(project_root)

Return pending tasks in priority order.

Source code in src/rigger/_protocols.py
def pending(self, project_root: Path) -> list[Task]:
    """Return pending tasks in priority order."""
    ...

mark_complete(task_id, result)

Update task status after execution.

Source code in src/rigger/_protocols.py
def mark_complete(self, task_id: str, result: TaskResult) -> None:
    """Update task status after execution."""
    ...

Verifier

Bases: Protocol

FL: Checks agent output against quality criteria.

Source code in src/rigger/_protocols.py
class Verifier(Protocol):
    """FL: Checks agent output against quality criteria."""

    def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
        """Check whether the task result meets quality criteria."""
        ...

verify(project_root, result)

Check whether the task result meets quality criteria.

Source code in src/rigger/_protocols.py
def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
    """Check whether the task result meets quality criteria."""
    ...

WorkspaceManager

Bases: Protocol

Creates and manages isolated execution environments for parallel agents.

Infrastructure protocol — NOT one of the 6 harness dimensions. Provides filesystem isolation so each parallel agent operates on its own project_root without interfering with other agents.

Implementations: - IndependentBranchManager: branch-per-task, push as PR (DEFAULT) - GitWorktreeManager: git worktrees with sequential merge back - IndependentDirManager: directory copies (non-git)

Source: Task 1.13 (parallel dispatch design), Task 5.6 (validation).

Source code in src/rigger/_protocols.py
class WorkspaceManager(Protocol):
    """Creates and manages isolated execution environments for parallel agents.

    Infrastructure protocol — NOT one of the 6 harness dimensions.
    Provides filesystem isolation so each parallel agent operates on
    its own project_root without interfering with other agents.

    Implementations:
    - IndependentBranchManager: branch-per-task, push as PR (DEFAULT)
    - GitWorktreeManager: git worktrees with sequential merge back
    - IndependentDirManager: directory copies (non-git)

    Source: Task 1.13 (parallel dispatch design), Task 5.6 (validation).
    """

    def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
        """Create an isolated workspace for one agent.

        Args:
            main_root: The main project root directory.
            task: The task to be executed in this workspace.
            branch_name: Branch name for git-based managers (may be
                ignored by non-git implementations).

        Returns:
            Path to the isolated workspace — becomes the agent's project_root.
        """
        ...

    def merge(self, worktree: Path, main_root: Path) -> MergeResult:
        """Merge or publish changes from a workspace.

        CONSTRAINT: merge() MUST NOT modify main_root beyond the
        merge/push operation itself.

        Args:
            worktree: Path to the isolated workspace.
            main_root: The main project root directory.

        Returns:
            MergeResult with success status and conflict information.
        """
        ...

    def cleanup(self, worktree: Path) -> None:
        """Remove an isolated workspace after use. Idempotent."""
        ...

create(main_root, task, branch_name)

Create an isolated workspace for one agent.

Parameters:

Name Type Description Default
main_root Path

The main project root directory.

required
task Task

The task to be executed in this workspace.

required
branch_name str

Branch name for git-based managers (may be ignored by non-git implementations).

required

Returns:

Type Description
Path

Path to the isolated workspace — becomes the agent's project_root.

Source code in src/rigger/_protocols.py
def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
    """Create an isolated workspace for one agent.

    Args:
        main_root: The main project root directory.
        task: The task to be executed in this workspace.
        branch_name: Branch name for git-based managers (may be
            ignored by non-git implementations).

    Returns:
        Path to the isolated workspace — becomes the agent's project_root.
    """
    ...

merge(worktree, main_root)

Merge or publish changes from a workspace.

CONSTRAINT: merge() MUST NOT modify main_root beyond the merge/push operation itself.

Parameters:

Name Type Description Default
worktree Path

Path to the isolated workspace.

required
main_root Path

The main project root directory.

required

Returns:

Type Description
MergeResult

MergeResult with success status and conflict information.

Source code in src/rigger/_protocols.py
def merge(self, worktree: Path, main_root: Path) -> MergeResult:
    """Merge or publish changes from a workspace.

    CONSTRAINT: merge() MUST NOT modify main_root beyond the
    merge/push operation itself.

    Args:
        worktree: Path to the isolated workspace.
        main_root: The main project root directory.

    Returns:
        MergeResult with success status and conflict information.
    """
    ...

cleanup(worktree)

Remove an isolated workspace after use. Idempotent.

Source code in src/rigger/_protocols.py
def cleanup(self, worktree: Path) -> None:
    """Remove an isolated workspace after use. Idempotent."""
    ...

ContextProvisioner dataclass

Aggregates multiple ContextSource instances into a single ProvisionResult.

Holds a flat list of ContextSource objects — no subclasses, no type hierarchy, no participation-mode distinction at the type level. Active provisioners and registration stubs are treated uniformly: call gather(), merge results.

Design decisions (Task 1.7): 1. Files are deduplicated by resolved path (preserve first-seen order). 2. Capabilities are concatenated in source order (no dedup). 3. Failing sources are logged and skipped (fail-open default). Use CriticalSource wrapper for fail-stop override. 4. Sources are called sequentially in list order.

Source code in src/rigger/_provisioner.py
@dataclass
class ContextProvisioner:
    """Aggregates multiple ContextSource instances into a single ProvisionResult.

    Holds a flat list of ContextSource objects — no subclasses, no type
    hierarchy, no participation-mode distinction at the type level. Active
    provisioners and registration stubs are treated uniformly: call
    gather(), merge results.

    Design decisions (Task 1.7):
        1. Files are deduplicated by resolved path (preserve first-seen order).
        2. Capabilities are concatenated in source order (no dedup).
        3. Failing sources are logged and skipped (fail-open default).
           Use CriticalSource wrapper for fail-stop override.
        4. Sources are called sequentially in list order.
    """

    sources: list[ContextSource] = field(default_factory=list)

    def provision(self, project_root: Path) -> ProvisionResult:
        """Call gather() on each source, merge results into one ProvisionResult.

        Args:
            project_root: The project directory.

        Returns:
            Aggregated ProvisionResult with deduplicated files and
            concatenated capabilities.
        """
        all_files: list[Path] = []
        seen_paths: dict[Path, str] = {}  # resolved_path -> source class name
        all_capabilities: list[str] = []

        for source in self.sources:
            try:
                result = source.gather(project_root)
            except Exception as exc:
                if isinstance(source, CriticalSource):
                    raise
                logger.warning(
                    "ContextSource %s.gather() raised %s: %s — skipping",
                    type(source).__name__,
                    type(exc).__name__,
                    exc,
                )
                continue

            for f in result.files:
                resolved = f.resolve()
                if resolved not in seen_paths:
                    seen_paths[resolved] = type(source).__name__
                    all_files.append(f)
                else:
                    logger.warning(
                        "Duplicate file path %s from %s (first seen from %s) — skipped",
                        f,
                        type(source).__name__,
                        seen_paths[resolved],
                    )

            all_capabilities.extend(result.capabilities)

        return ProvisionResult(files=all_files, capabilities=all_capabilities)

provision(project_root)

Call gather() on each source, merge results into one ProvisionResult.

Parameters:

Name Type Description Default
project_root Path

The project directory.

required

Returns:

Type Description
ProvisionResult

Aggregated ProvisionResult with deduplicated files and

ProvisionResult

concatenated capabilities.

Source code in src/rigger/_provisioner.py
def provision(self, project_root: Path) -> ProvisionResult:
    """Call gather() on each source, merge results into one ProvisionResult.

    Args:
        project_root: The project directory.

    Returns:
        Aggregated ProvisionResult with deduplicated files and
        concatenated capabilities.
    """
    all_files: list[Path] = []
    seen_paths: dict[Path, str] = {}  # resolved_path -> source class name
    all_capabilities: list[str] = []

    for source in self.sources:
        try:
            result = source.gather(project_root)
        except Exception as exc:
            if isinstance(source, CriticalSource):
                raise
            logger.warning(
                "ContextSource %s.gather() raised %s: %s — skipping",
                type(source).__name__,
                type(exc).__name__,
                exc,
            )
            continue

        for f in result.files:
            resolved = f.resolve()
            if resolved not in seen_paths:
                seen_paths[resolved] = type(source).__name__
                all_files.append(f)
            else:
                logger.warning(
                    "Duplicate file path %s from %s (first seen from %s) — skipped",
                    f,
                    type(source).__name__,
                    seen_paths[resolved],
                )

        all_capabilities.extend(result.capabilities)

    return ProvisionResult(files=all_files, capabilities=all_capabilities)

CriticalSource

Wrapper that makes a ContextSource's failure abort provision().

When a CriticalSource's gather() fails, the exception propagates instead of being swallowed. Use this when a source's absence makes the agent's context dangerously incomplete.

The provisioner checks isinstance(source, CriticalSource) before catching exceptions. This is a type-level check, not a protocol method — the ContextSource protocol remains single-method.

Usage::

provisioner = ContextProvisioner(sources=[
    CriticalSource(InitializerProgressSource()),  # fail-stop
    ExternalIssueTrackerSource("PROJ-42"),        # fail-open
])
Source code in src/rigger/_provisioner.py
class CriticalSource:
    """Wrapper that makes a ContextSource's failure abort provision().

    When a CriticalSource's gather() fails, the exception propagates
    instead of being swallowed. Use this when a source's absence makes
    the agent's context dangerously incomplete.

    The provisioner checks isinstance(source, CriticalSource) before
    catching exceptions. This is a type-level check, not a protocol
    method — the ContextSource protocol remains single-method.

    Usage::

        provisioner = ContextProvisioner(sources=[
            CriticalSource(InitializerProgressSource()),  # fail-stop
            ExternalIssueTrackerSource("PROJ-42"),        # fail-open
        ])
    """

    def __init__(self, inner: ContextSource) -> None:
        self._inner = inner

    def gather(self, project_root: Path) -> ProvisionResult:
        """Delegate to wrapped source. Exceptions propagate uncaught."""
        return self._inner.gather(project_root)

gather(project_root)

Delegate to wrapped source. Exceptions propagate uncaught.

Source code in src/rigger/_provisioner.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Delegate to wrapped source. Exceptions propagate uncaught."""
    return self._inner.gather(project_root)

Registry

Maps (protocol, name) pairs to implementation classes.

Built-in implementations are registered at construction time. Third-party plugins are discovered lazily from importlib.metadata.entry_points(group="rigger.{protocol}").

Source code in src/rigger/_registry.py
class Registry:
    """Maps ``(protocol, name)`` pairs to implementation classes.

    Built-in implementations are registered at construction time.
    Third-party plugins are discovered lazily from
    ``importlib.metadata.entry_points(group="rigger.{protocol}")``.
    """

    def __init__(self) -> None:
        self._builtins: dict[str, dict[str, type]] = {}
        self._plugins: dict[str, dict[str, type]] = {}
        self._plugins_loaded: set[str] = set()

    def register(self, protocol: str, name: str, cls: type) -> None:
        """Register a class under a protocol/name pair.

        Args:
            protocol: Protocol group name (e.g. ``"task_source"``).
            name: Config name (e.g. ``"file_list"``).
            cls: The implementation class.
        """
        self._builtins.setdefault(protocol, {})[name] = cls

    def get(self, protocol: str, name: str) -> type:
        """Look up a class by protocol and config name.

        Plugins override built-ins (with a warning logged on first load).

        Args:
            protocol: Protocol group name.
            name: Config name.

        Returns:
            The implementation class.

        Raises:
            KeyError: If no implementation is registered for the given
                protocol/name combination.
        """
        self._ensure_plugins_loaded(protocol)

        # Plugins take priority over built-ins.
        plugin_cls = self._plugins.get(protocol, {}).get(name)
        if plugin_cls is not None:
            return plugin_cls

        builtin_cls = self._builtins.get(protocol, {}).get(name)
        if builtin_cls is not None:
            return builtin_cls

        available = sorted(self._available_names(protocol))
        msg = (
            f"Unknown {protocol} type {name!r}. "
            f"Available: {', '.join(available) if available else '(none)'}"
        )
        raise KeyError(msg)

    def create(self, protocol: str, name: str, **kwargs: Any) -> Any:
        """Instantiate an implementation from config parameters.

        Args:
            protocol: Protocol group name.
            name: Config name.
            **kwargs: Constructor arguments forwarded to the class.

        Returns:
            An instance of the looked-up class.
        """
        cls = self.get(protocol, name)
        try:
            return cls(**kwargs)
        except TypeError as exc:
            msg = f"Failed to create {protocol}/{name} ({cls.__name__}): {exc}"
            raise TypeError(msg) from exc

    def available(self, protocol: str) -> list[str]:
        """Return sorted list of available config names for a protocol.

        Args:
            protocol: Protocol group name.

        Returns:
            Sorted list of registered config names.
        """
        self._ensure_plugins_loaded(protocol)
        return sorted(self._available_names(protocol))

    def _available_names(self, protocol: str) -> set[str]:
        names = set(self._builtins.get(protocol, {}).keys())
        names |= set(self._plugins.get(protocol, {}).keys())
        return names

    def _ensure_plugins_loaded(self, protocol: str) -> None:
        if protocol in self._plugins_loaded:
            return
        self._plugins_loaded.add(protocol)
        self._load_entry_points(protocol)

    def _load_entry_points(self, protocol: str) -> None:
        group = f"rigger.{protocol}"
        try:
            eps = importlib.metadata.entry_points(group=group)
        except TypeError:
            # Python <3.12 compat (shouldn't hit on 3.13+, but defensive).
            eps = importlib.metadata.entry_points().get(group, [])  # type: ignore[assignment]

        for ep in eps:
            try:
                cls = ep.load()
            except Exception:
                logger.warning(
                    "Failed to load entry point %s from group %s",
                    ep.name,
                    group,
                    exc_info=True,
                )
                continue

            if ep.name in self._builtins.get(protocol, {}):
                logger.warning(
                    "Plugin %r overrides built-in %s/%s",
                    ep.value,
                    protocol,
                    ep.name,
                )
            self._plugins.setdefault(protocol, {})[ep.name] = cls

register(protocol, name, cls)

Register a class under a protocol/name pair.

Parameters:

Name Type Description Default
protocol str

Protocol group name (e.g. "task_source").

required
name str

Config name (e.g. "file_list").

required
cls type

The implementation class.

required
Source code in src/rigger/_registry.py
def register(self, protocol: str, name: str, cls: type) -> None:
    """Register a class under a protocol/name pair.

    Args:
        protocol: Protocol group name (e.g. ``"task_source"``).
        name: Config name (e.g. ``"file_list"``).
        cls: The implementation class.
    """
    self._builtins.setdefault(protocol, {})[name] = cls

get(protocol, name)

Look up a class by protocol and config name.

Plugins override built-ins (with a warning logged on first load).

Parameters:

Name Type Description Default
protocol str

Protocol group name.

required
name str

Config name.

required

Returns:

Type Description
type

The implementation class.

Raises:

Type Description
KeyError

If no implementation is registered for the given protocol/name combination.

Source code in src/rigger/_registry.py
def get(self, protocol: str, name: str) -> type:
    """Look up a class by protocol and config name.

    Plugins override built-ins (with a warning logged on first load).

    Args:
        protocol: Protocol group name.
        name: Config name.

    Returns:
        The implementation class.

    Raises:
        KeyError: If no implementation is registered for the given
            protocol/name combination.
    """
    self._ensure_plugins_loaded(protocol)

    # Plugins take priority over built-ins.
    plugin_cls = self._plugins.get(protocol, {}).get(name)
    if plugin_cls is not None:
        return plugin_cls

    builtin_cls = self._builtins.get(protocol, {}).get(name)
    if builtin_cls is not None:
        return builtin_cls

    available = sorted(self._available_names(protocol))
    msg = (
        f"Unknown {protocol} type {name!r}. "
        f"Available: {', '.join(available) if available else '(none)'}"
    )
    raise KeyError(msg)

create(protocol, name, **kwargs)

Instantiate an implementation from config parameters.

Parameters:

Name Type Description Default
protocol str

Protocol group name.

required
name str

Config name.

required
**kwargs Any

Constructor arguments forwarded to the class.

{}

Returns:

Type Description
Any

An instance of the looked-up class.

Source code in src/rigger/_registry.py
def create(self, protocol: str, name: str, **kwargs: Any) -> Any:
    """Instantiate an implementation from config parameters.

    Args:
        protocol: Protocol group name.
        name: Config name.
        **kwargs: Constructor arguments forwarded to the class.

    Returns:
        An instance of the looked-up class.
    """
    cls = self.get(protocol, name)
    try:
        return cls(**kwargs)
    except TypeError as exc:
        msg = f"Failed to create {protocol}/{name} ({cls.__name__}): {exc}"
        raise TypeError(msg) from exc

available(protocol)

Return sorted list of available config names for a protocol.

Parameters:

Name Type Description Default
protocol str

Protocol group name.

required

Returns:

Type Description
list[str]

Sorted list of registered config names.

Source code in src/rigger/_registry.py
def available(self, protocol: str) -> list[str]:
    """Return sorted list of available config names for a protocol.

    Args:
        protocol: Protocol group name.

    Returns:
        Sorted list of registered config names.
    """
    self._ensure_plugins_loaded(protocol)
    return sorted(self._available_names(protocol))

FilesystemEntropyTaskSource

Reads entropy tasks from partitioned files in .harness/entropy/.

On initialization, migrates legacy pending_tasks.json to the partitioned format and cleans up orphaned temp files.

Partitions are read in sorted order (oldest first by filename timestamp). mark_complete() removes tasks from their partition, deleting empty partition files.

Source code in src/rigger/_schema.py
class FilesystemEntropyTaskSource:
    """Reads entropy tasks from partitioned files in .harness/entropy/.

    On initialization, migrates legacy ``pending_tasks.json`` to
    the partitioned format and cleans up orphaned temp files.

    Partitions are read in sorted order (oldest first by filename
    timestamp). ``mark_complete()`` removes tasks from their
    partition, deleting empty partition files.
    """

    def __init__(self, project_root: Path) -> None:
        self._project_root = project_root
        self._entropy_dir = project_root / HARNESS_DIR / ENTROPY_DIR
        self._migrate_legacy()
        self._cleanup_orphans()

    def _migrate_legacy(self) -> None:
        """Migrate legacy pending_tasks.json to partitioned format."""
        legacy_path = self._project_root / HARNESS_DIR / _LEGACY_PENDING_FILE
        if not legacy_path.exists():
            return

        try:
            data = json.loads(legacy_path.read_text())
        except (json.JSONDecodeError, OSError):
            legacy_path.unlink(missing_ok=True)
            return

        tasks: list[Task] = []
        items = data if isinstance(data, list) else data.get("tasks", [])
        for item in items:
            if isinstance(item, dict) and "id" in item and "description" in item:
                tasks.append(
                    Task(
                        id=item["id"],
                        description=item["description"],
                        metadata=item.get("metadata", {}),
                    )
                )

        if tasks:
            write_entropy_tasks(self._project_root, tasks)

        legacy_path.unlink(missing_ok=True)
        logger.info("Migrated %d tasks from legacy pending_tasks.json", len(tasks))

    def _cleanup_orphans(self) -> None:
        """Remove orphaned .tmp_* files in the entropy directory."""
        if not self._entropy_dir.exists():
            return
        for tmp in self._entropy_dir.glob(".tmp_*"):
            with contextlib.suppress(OSError):
                tmp.unlink()

    def _partition_paths(self) -> list[Path]:
        """Return sorted partition file paths (oldest first)."""
        if not self._entropy_dir.exists():
            return []
        paths = sorted(self._entropy_dir.glob("tasks_*.json"))
        return paths

    def _read_partition(self, path: Path) -> list[Task]:
        """Read tasks from a single partition file."""
        try:
            data = json.loads(path.read_text())
        except (json.JSONDecodeError, OSError):
            logger.warning("Malformed entropy partition %s — skipping", path.name)
            return []

        items = data.get("tasks", []) if isinstance(data, dict) else []
        tasks: list[Task] = []
        for item in items:
            if isinstance(item, dict) and "id" in item and "description" in item:
                tasks.append(
                    Task(
                        id=item["id"],
                        description=item["description"],
                        metadata=item.get("metadata", {}),
                    )
                )
        return tasks

    def pending(self, project_root: Path) -> list[Task]:
        """Return all pending entropy tasks across partitions, oldest first."""
        tasks: list[Task] = []
        for path in self._partition_paths():
            tasks.extend(self._read_partition(path))
        return tasks

    def mark_complete(self, task_id: str, result: object = None) -> None:
        """Remove a completed task from its partition.

        Scans partitions for the task_id. If removal leaves the
        partition empty, the file is deleted. Otherwise, the
        partition is atomically rewritten.
        """
        for path in self._partition_paths():
            tasks = self._read_partition(path)
            remaining = [t for t in tasks if t.id != task_id]
            if len(remaining) == len(tasks):
                continue

            if not remaining:
                with contextlib.suppress(OSError):
                    path.unlink()
            else:
                data: list[dict[str, Any]] = [
                    {
                        "_schema_version": SCHEMA_VERSION,
                        "id": t.id,
                        "description": t.description,
                        "metadata": t.metadata,
                    }
                    for t in remaining
                ]
                _atomic_write(path, {"tasks": data})
            return

pending(project_root)

Return all pending entropy tasks across partitions, oldest first.

Source code in src/rigger/_schema.py
def pending(self, project_root: Path) -> list[Task]:
    """Return all pending entropy tasks across partitions, oldest first."""
    tasks: list[Task] = []
    for path in self._partition_paths():
        tasks.extend(self._read_partition(path))
    return tasks

mark_complete(task_id, result=None)

Remove a completed task from its partition.

Scans partitions for the task_id. If removal leaves the partition empty, the file is deleted. Otherwise, the partition is atomically rewritten.

Source code in src/rigger/_schema.py
def mark_complete(self, task_id: str, result: object = None) -> None:
    """Remove a completed task from its partition.

    Scans partitions for the task_id. If removal leaves the
    partition empty, the file is deleted. Otherwise, the
    partition is atomically rewritten.
    """
    for path in self._partition_paths():
        tasks = self._read_partition(path)
        remaining = [t for t in tasks if t.id != task_id]
        if len(remaining) == len(tasks):
            continue

        if not remaining:
            with contextlib.suppress(OSError):
                path.unlink()
        else:
            data: list[dict[str, Any]] = [
                {
                    "_schema_version": SCHEMA_VERSION,
                    "id": t.id,
                    "description": t.description,
                    "metadata": t.metadata,
                }
                for t in remaining
            ]
            _atomic_write(path, {"tasks": data})
        return

Action

Bases: Enum

Control signal returned by callbacks to influence loop behavior.

Source code in src/rigger/_types.py
class Action(Enum):
    """Control signal returned by callbacks to influence loop behavior."""

    CONTINUE = "continue"
    SKIP_TASK = "skip"
    HALT = "halt"

EpochState dataclass

Externalized state readable by the loop controller.

Serialized to .harness/state.json by the harness loop. epoch=0 is the initial state, semantically equivalent to a missing state.json file.

Source code in src/rigger/_types.py
@dataclass
class EpochState:
    """Externalized state readable by the loop controller.

    Serialized to .harness/state.json by the harness loop.
    epoch=0 is the initial state, semantically equivalent to a missing
    state.json file.
    """

    epoch: int = 0
    completed_tasks: list[str] = field(default_factory=list)
    pending_tasks: list[str] = field(default_factory=list)
    halted: bool = False
    halt_reason: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)

MergeResult dataclass

Outcome of merging/publishing worktree changes.

Used by WorkspaceManager.merge(). Captures success status, merged commits, conflict information, and diagnostic metadata.

Source code in src/rigger/_types.py
@dataclass
class MergeResult:
    """Outcome of merging/publishing worktree changes.

    Used by WorkspaceManager.merge(). Captures success status,
    merged commits, conflict information, and diagnostic metadata.
    """

    success: bool
    merged_commits: list[str] = field(default_factory=list)
    conflicts: list[str] = field(default_factory=list)
    worktree_path: Path | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

ProvisionResult dataclass

What a ContextSource prepared for the agent.

Two field categories

files: Filesystem paths written or verified on disk. capabilities: Human-readable descriptions of non-filesystem provisions.

Source code in src/rigger/_types.py
@dataclass
class ProvisionResult:
    """What a ContextSource prepared for the agent.

    Two field categories:
        files: Filesystem paths written or verified on disk.
        capabilities: Human-readable descriptions of non-filesystem provisions.
    """

    files: list[Path] = field(default_factory=list)
    capabilities: list[str] = field(default_factory=list)

Task dataclass

A unit of work for an agent to execute.

Serialized to .harness/current_task.json by the harness loop. The backend reads this from disk — the harness does NOT pass it via function args.

Source code in src/rigger/_types.py
@dataclass
class Task:
    """A unit of work for an agent to execute.

    Serialized to .harness/current_task.json by the harness loop.
    The backend reads this from disk — the harness does NOT pass it
    via function args.
    """

    id: str
    description: str
    metadata: dict[str, Any] = field(default_factory=dict)

TaskResult dataclass

Observable side effects of an agent's execution.

Constructed by AgentBackend.execute() or by the harness loop for error/halt scenarios. The status field is mutable — the canonical loop updates it after verification.

Source code in src/rigger/_types.py
@dataclass
class TaskResult:
    """Observable side effects of an agent's execution.

    Constructed by AgentBackend.execute() or by the harness loop for
    error/halt scenarios. The status field is mutable — the canonical
    loop updates it after verification.
    """

    task_id: str
    status: str  # "success", "failure", "partial", "error", etc.
    artifacts: list[Path] = field(default_factory=list)
    commits: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

VerifyAction

Bases: Enum

What the harness should do after verification.

Used by VerifyResult.action to communicate routing intent: ACCEPT: Checks passed; proceed to next epoch. RETRY: Checks failed; send feedback and re-dispatch. BLOCK: Hard gate; cannot proceed (constraint violation). ESCALATE: Retry budget exhausted; notify human operator.

Source code in src/rigger/_types.py
class VerifyAction(Enum):
    """What the harness should do after verification.

    Used by VerifyResult.action to communicate routing intent:
        ACCEPT: Checks passed; proceed to next epoch.
        RETRY: Checks failed; send feedback and re-dispatch.
        BLOCK: Hard gate; cannot proceed (constraint violation).
        ESCALATE: Retry budget exhausted; notify human operator.
    """

    ACCEPT = "accept"
    RETRY = "retry"
    BLOCK = "block"
    ESCALATE = "escalate"

VerifyResult dataclass

Outcome of verification or constraint check.

Used by Verifier.verify() and Constraint.check(). The post_init enforces the invariant: passed=True → ACCEPT, passed=False → non-ACCEPT.

Source code in src/rigger/_types.py
@dataclass
class VerifyResult:
    """Outcome of verification or constraint check.

    Used by Verifier.verify() and Constraint.check(). The __post_init__
    enforces the invariant: passed=True → ACCEPT, passed=False → non-ACCEPT.
    """

    passed: bool
    action: VerifyAction = VerifyAction.ACCEPT
    message: str = ""
    details: dict[str, Any] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        """Enforce passed/action consistency invariant."""
        if self.passed and self.action != VerifyAction.ACCEPT:
            self.action = VerifyAction.ACCEPT
        elif not self.passed and self.action == VerifyAction.ACCEPT:
            self.action = VerifyAction.RETRY

__post_init__()

Enforce passed/action consistency invariant.

Source code in src/rigger/_types.py
def __post_init__(self) -> None:
    """Enforce passed/action consistency invariant."""
    if self.passed and self.action != VerifyAction.ACCEPT:
        self.action = VerifyAction.ACCEPT
    elif not self.passed and self.action == VerifyAction.ACCEPT:
        self.action = VerifyAction.RETRY

ClaudeCodeBackend

AgentBackend implementation for the Claude Agent SDK.

Wraps the Claude Agent SDK to execute a coding agent in a fresh context per epoch. Prompt construction, SDK configuration, and result parsing are entirely internal to this class.

The agent is a BLACK BOX. This backend:

  • READS .harness/ files written by the Rigger loop
  • CONSTRUCTS prompt templates from .harness/ data
  • CONFIGURES the SDK client (setting_sources, tools, permissions)
  • EXECUTES the agent via the SDK
  • PARSES observable side effects into TaskResult

It MUST NOT:

  • Write to .harness/ (Rigger-owned)
  • Model agent-internal decision-making
  • Pass context via any channel other than filesystem + SDK config

Required convention: setting_sources=["project"] is hardcoded for filesystem-provisioned context (AGENTS.md, CLAUDE.md) to be loaded by the agent. (Task 5.4 D3, Task 5.9).

Source: Task 5.4, Task 5.9.

Source code in src/rigger/backends/claude_code.py
class ClaudeCodeBackend:
    """AgentBackend implementation for the Claude Agent SDK.

    Wraps the Claude Agent SDK to execute a coding agent in a fresh context
    per epoch. Prompt construction, SDK configuration, and result parsing
    are entirely internal to this class.

    The agent is a BLACK BOX. This backend:

    - READS ``.harness/`` files written by the Rigger loop
    - CONSTRUCTS prompt templates from ``.harness/`` data
    - CONFIGURES the SDK client (setting_sources, tools, permissions)
    - EXECUTES the agent via the SDK
    - PARSES observable side effects into TaskResult

    It MUST NOT:

    - Write to ``.harness/`` (Rigger-owned)
    - Model agent-internal decision-making
    - Pass context via any channel other than filesystem + SDK config

    Required convention: ``setting_sources=["project"]`` is hardcoded for
    filesystem-provisioned context (AGENTS.md, CLAUDE.md) to be loaded
    by the agent. (Task 5.4 D3, Task 5.9).

    Source: Task 5.4, Task 5.9.
    """

    def __init__(
        self,
        model: str = "claude-sonnet-4-6",
        *,
        system_prompt: str | None = None,
        permission_mode: str = "bypassPermissions",
        mcp_servers: dict[str, Any] | None = None,
        allowed_tools: list[str] | None = None,
        disallowed_tools: list[str] | None = None,
        max_turns: int | None = None,
        hooks: dict[str, Any] | None = None,
        sandbox: dict[str, Any] | None = None,
        env: dict[str, str] | None = None,
        add_dirs: list[str | Path] | None = None,
        prompt_templates: PromptTemplateSet | None = None,
    ) -> None:
        """Initialize the ClaudeCodeBackend.

        All parameters are STATIC backend configuration (Task 1.17 §4.1).
        Dynamic per-epoch configuration flows through
        ``.harness/constraints.json``.

        Args:
            model: Claude model to use.
            system_prompt: Optional system prompt override. Most harnesses
                should leave this None and use CLAUDE.md/AGENTS.md via
                setting_sources instead.
            permission_mode: Tool permission mode for automated execution.
            mcp_servers: MCP server configurations. Static for MVP.
            allowed_tools: Constructor-level tool allowlist (the FLOOR).
            disallowed_tools: Constructor-level tool denylist.
            max_turns: Constructor-level turn limit.
            hooks: SDK lifecycle hooks.
            sandbox: Sandbox configuration.
            env: Environment variables for the agent subprocess.
            add_dirs: Additional directories the agent can access.
            prompt_templates: Custom prompt template set. Defaults to
                the built-in templates shipped with this backend.
        """
        self.model = model
        self.system_prompt = system_prompt
        self.permission_mode = permission_mode
        self.mcp_servers = mcp_servers or {}
        self.allowed_tools = list(allowed_tools or [])
        self.disallowed_tools = list(disallowed_tools or [])
        self.max_turns = max_turns
        self.hooks = hooks
        self.sandbox = sandbox
        self.env = env or {}
        self.add_dirs = [str(d) for d in (add_dirs or [])]
        self.templates = prompt_templates or DefaultPromptTemplates()

    async def execute(self, project_root: Path) -> TaskResult:
        """Execute a coding agent in a fresh context.

        Implements the ``AgentBackend`` protocol. Sequence (Task 5.4 §1.2):

        1. Read .harness/ files (task, state, constraints)
        2. Determine prompt phase (initializer / coding / error_recovery)
        3. Build SDK options (merge constructor + constraints)
        4. Snapshot pre-execution git state
        5. Build prompt and execute agent
        6. Diff git state to detect changes
        7. Parse and return TaskResult

        Args:
            project_root: The project directory.

        Returns:
            TaskResult with observable side effects.
        """
        start_time = time.monotonic()

        # ── Step 1: Read .harness/ files ──
        task = read_current_task(project_root)
        state = read_state(project_root)
        constraints = read_constraints(project_root)

        # Handle missing MUST file: current_task.json (Task 5.4 §5)
        if task is None:
            logger.error(
                "Cannot execute: .harness/current_task.json is missing or malformed"
            )
            return TaskResult(
                task_id="unknown",
                status="error",
                metadata={
                    "error": "missing_current_task",
                    "error_type": "MissingTaskFile",
                    "execution_time_s": time.monotonic() - start_time,
                },
            )

        # ── Step 2: Determine prompt phase ──
        phase = self._determine_phase(state)

        # ── Step 3: Build SDK options ──
        sdk_options = self._build_sdk_options(project_root, constraints)

        # ── Step 4: Snapshot pre-execution state ──
        pre_snapshot = _snapshot_repo_state(project_root)

        # ── Step 5: Build prompt and execute ──
        prompt = self.templates.render(phase=phase, task=task, state=state)

        try:
            agent_output = await self._run_agent(sdk_options, prompt)
        except Exception as exc:
            logger.error("Agent execution failed: %s", exc)
            post_snapshot = _snapshot_repo_state(project_root)
            changes = _detect_changes(pre_snapshot, post_snapshot, project_root)
            return TaskResult(
                task_id=task.id,
                status="error",
                artifacts=[Path(f) for f in changes["modified_files"]],
                commits=changes["new_commits"],
                metadata={
                    "error": str(exc),
                    "error_type": type(exc).__name__,
                    "execution_time_s": time.monotonic() - start_time,
                    "partial_changes": bool(changes["new_commits"]),
                },
            )

        # ── Step 6: Detect changes ──
        post_snapshot = _snapshot_repo_state(project_root)
        changes = _detect_changes(pre_snapshot, post_snapshot, project_root)

        # ── Step 7: Parse and return TaskResult ──
        return self._parse_result(
            task_id=task.id,
            agent_output=agent_output,
            changes=changes,
            execution_time_s=time.monotonic() - start_time,
        )

    def _determine_phase(self, state: EpochState) -> str:
        """Determine which prompt template to use.

        Two-Phase Startup (P2) + error recovery (Task 5.4 D7, D12):

        - epoch <= 1 -> "initializer"
        - epoch > 1 with prior failure/error -> "error_recovery"
        - epoch > 1 -> "coding"

        Args:
            state: Current epoch state.

        Returns:
            Phase name: "initializer", "coding", or "error_recovery".
        """
        if state.epoch <= 1:
            return "initializer"

        last_status = state.metadata.get("last_result_status")
        if last_status in ("error", "failure"):
            return "error_recovery"

        return "coding"

    def _build_sdk_options(
        self,
        project_root: Path,
        constraints: dict[str, Any],
    ) -> dict[str, Any]:
        """Build SDK client options from constructor + constraints.

        Defense-in-depth merge (Task 5.4 §3.2, Task 1.17 §2.2):
        Constructor restrictions are the FLOOR. ``.harness/constraints.json``
        can only tighten, never loosen.

        Args:
            project_root: Project directory (becomes ``cwd``).
            constraints: Parsed constraints from ``.harness/constraints.json``.

        Returns:
            Dict of options for ``ClaudeAgentOptions`` construction.
        """
        options: dict[str, Any] = {
            "cwd": str(project_root),
            "model": self.model,
            "setting_sources": ["project"],  # REQUIRED (Task 5.4 D3)
            "permission_mode": self.permission_mode,
        }

        # Optional constructor parameters
        if self.system_prompt:
            options["system_prompt"] = self.system_prompt
        if self.mcp_servers:
            options["mcp_servers"] = self.mcp_servers
        if self.hooks:
            options["hooks"] = self.hooks
        if self.sandbox:
            options["sandbox"] = self.sandbox
        if self.env:
            options["env"] = self.env
        if self.add_dirs:
            options["add_dirs"] = self.add_dirs

        # ── Defense-in-depth merge ──
        effective_allowed = set(self.allowed_tools) if self.allowed_tools else None
        effective_disallowed = set(self.disallowed_tools)
        effective_max_turns = self.max_turns

        if constraints:
            # disallowed_tools: ADDITIVE (union) — can only grow
            file_disallowed = constraints.get("disallowed_tools", [])
            effective_disallowed.update(file_disallowed)

            # allowed_tools: RESTRICTIVE (intersection) — can only shrink
            file_allowed = constraints.get("allowed_tools", [])
            if file_allowed:
                file_set = set(file_allowed)
                if effective_allowed is not None:
                    effective_allowed = effective_allowed & file_set
                else:
                    effective_allowed = file_set

            # max_iterations -> max_turns: SCALAR-MIN — can only decrease
            file_max = constraints.get("max_iterations")
            if file_max is not None:
                if effective_max_turns is not None:
                    effective_max_turns = min(effective_max_turns, file_max)
                else:
                    effective_max_turns = file_max

        # Apply merged restrictions
        if effective_allowed is not None:
            options["allowed_tools"] = sorted(effective_allowed)
        if effective_disallowed:
            options["disallowed_tools"] = sorted(effective_disallowed)
        if effective_max_turns is not None:
            options["max_turns"] = effective_max_turns

        return options

    async def _run_agent(
        self,
        sdk_options: dict[str, Any],
        prompt: str,
    ) -> str:
        """Create a fresh SDK client and execute the prompt.

        Single SDK coupling point (Task 5.4 D9). Per-epoch fresh client
        creation (C1, C9 patterns). Uses the standalone ``query()`` function
        for one-shot execution.

        Args:
            sdk_options: Options dict for ``ClaudeAgentOptions``.
            prompt: Rendered prompt string.

        Returns:
            Accumulated agent text output.

        Raises:
            Exception: If the SDK client fails to create or execute.
        """
        from claude_agent_sdk import (
            AssistantMessage,
            ClaudeAgentOptions,
            query,
        )

        options = ClaudeAgentOptions(**sdk_options)
        response_text = ""
        async for msg in query(prompt=prompt, options=options):
            if isinstance(msg, AssistantMessage):
                for block in msg.content:
                    text = getattr(block, "text", None)
                    if isinstance(text, str):
                        response_text += text
        return response_text

    def _parse_result(
        self,
        task_id: str,
        agent_output: str,
        changes: dict[str, Any],
        execution_time_s: float,
    ) -> TaskResult:
        """Parse observable side effects into a TaskResult.

        Status from OBSERVABLE ARTIFACTS only (Task 5.4 §4):

        - "success": New commits
        - "partial": File changes without commits
        - "failure": No observable changes

        Source: Task 5.4 D5 (git commits as primary success signal).

        Args:
            task_id: Current task identifier.
            agent_output: Accumulated agent text output.
            changes: Change detection results from ``_detect_changes()``.
            execution_time_s: Wall-clock execution time.

        Returns:
            TaskResult with status and metadata.
        """
        new_commits = changes.get("new_commits", [])
        modified_files = changes.get("modified_files", [])
        has_uncommitted = changes.get("has_uncommitted_changes", False)

        if new_commits:
            status = "success"
        elif modified_files or has_uncommitted:
            status = "partial"
        else:
            status = "failure"

        return TaskResult(
            task_id=task_id,
            status=status,
            artifacts=[Path(f) for f in modified_files],
            commits=new_commits,
            metadata={
                "execution_time_s": execution_time_s,
                "agent_output_length": len(agent_output),
                "commit_count": len(new_commits),
                "files_changed": len(modified_files),
                "has_uncommitted_changes": has_uncommitted,
            },
        )

__init__(model='claude-sonnet-4-6', *, system_prompt=None, permission_mode='bypassPermissions', mcp_servers=None, allowed_tools=None, disallowed_tools=None, max_turns=None, hooks=None, sandbox=None, env=None, add_dirs=None, prompt_templates=None)

Initialize the ClaudeCodeBackend.

All parameters are STATIC backend configuration (Task 1.17 §4.1). Dynamic per-epoch configuration flows through .harness/constraints.json.

Parameters:

Name Type Description Default
model str

Claude model to use.

'claude-sonnet-4-6'
system_prompt str | None

Optional system prompt override. Most harnesses should leave this None and use CLAUDE.md/AGENTS.md via setting_sources instead.

None
permission_mode str

Tool permission mode for automated execution.

'bypassPermissions'
mcp_servers dict[str, Any] | None

MCP server configurations. Static for MVP.

None
allowed_tools list[str] | None

Constructor-level tool allowlist (the FLOOR).

None
disallowed_tools list[str] | None

Constructor-level tool denylist.

None
max_turns int | None

Constructor-level turn limit.

None
hooks dict[str, Any] | None

SDK lifecycle hooks.

None
sandbox dict[str, Any] | None

Sandbox configuration.

None
env dict[str, str] | None

Environment variables for the agent subprocess.

None
add_dirs list[str | Path] | None

Additional directories the agent can access.

None
prompt_templates PromptTemplateSet | None

Custom prompt template set. Defaults to the built-in templates shipped with this backend.

None
Source code in src/rigger/backends/claude_code.py
def __init__(
    self,
    model: str = "claude-sonnet-4-6",
    *,
    system_prompt: str | None = None,
    permission_mode: str = "bypassPermissions",
    mcp_servers: dict[str, Any] | None = None,
    allowed_tools: list[str] | None = None,
    disallowed_tools: list[str] | None = None,
    max_turns: int | None = None,
    hooks: dict[str, Any] | None = None,
    sandbox: dict[str, Any] | None = None,
    env: dict[str, str] | None = None,
    add_dirs: list[str | Path] | None = None,
    prompt_templates: PromptTemplateSet | None = None,
) -> None:
    """Initialize the ClaudeCodeBackend.

    All parameters are STATIC backend configuration (Task 1.17 §4.1).
    Dynamic per-epoch configuration flows through
    ``.harness/constraints.json``.

    Args:
        model: Claude model to use.
        system_prompt: Optional system prompt override. Most harnesses
            should leave this None and use CLAUDE.md/AGENTS.md via
            setting_sources instead.
        permission_mode: Tool permission mode for automated execution.
        mcp_servers: MCP server configurations. Static for MVP.
        allowed_tools: Constructor-level tool allowlist (the FLOOR).
        disallowed_tools: Constructor-level tool denylist.
        max_turns: Constructor-level turn limit.
        hooks: SDK lifecycle hooks.
        sandbox: Sandbox configuration.
        env: Environment variables for the agent subprocess.
        add_dirs: Additional directories the agent can access.
        prompt_templates: Custom prompt template set. Defaults to
            the built-in templates shipped with this backend.
    """
    self.model = model
    self.system_prompt = system_prompt
    self.permission_mode = permission_mode
    self.mcp_servers = mcp_servers or {}
    self.allowed_tools = list(allowed_tools or [])
    self.disallowed_tools = list(disallowed_tools or [])
    self.max_turns = max_turns
    self.hooks = hooks
    self.sandbox = sandbox
    self.env = env or {}
    self.add_dirs = [str(d) for d in (add_dirs or [])]
    self.templates = prompt_templates or DefaultPromptTemplates()

execute(project_root) async

Execute a coding agent in a fresh context.

Implements the AgentBackend protocol. Sequence (Task 5.4 §1.2):

  1. Read .harness/ files (task, state, constraints)
  2. Determine prompt phase (initializer / coding / error_recovery)
  3. Build SDK options (merge constructor + constraints)
  4. Snapshot pre-execution git state
  5. Build prompt and execute agent
  6. Diff git state to detect changes
  7. Parse and return TaskResult

Parameters:

Name Type Description Default
project_root Path

The project directory.

required

Returns:

Type Description
TaskResult

TaskResult with observable side effects.

Source code in src/rigger/backends/claude_code.py
async def execute(self, project_root: Path) -> TaskResult:
    """Execute a coding agent in a fresh context.

    Implements the ``AgentBackend`` protocol. Sequence (Task 5.4 §1.2):

    1. Read .harness/ files (task, state, constraints)
    2. Determine prompt phase (initializer / coding / error_recovery)
    3. Build SDK options (merge constructor + constraints)
    4. Snapshot pre-execution git state
    5. Build prompt and execute agent
    6. Diff git state to detect changes
    7. Parse and return TaskResult

    Args:
        project_root: The project directory.

    Returns:
        TaskResult with observable side effects.
    """
    start_time = time.monotonic()

    # ── Step 1: Read .harness/ files ──
    task = read_current_task(project_root)
    state = read_state(project_root)
    constraints = read_constraints(project_root)

    # Handle missing MUST file: current_task.json (Task 5.4 §5)
    if task is None:
        logger.error(
            "Cannot execute: .harness/current_task.json is missing or malformed"
        )
        return TaskResult(
            task_id="unknown",
            status="error",
            metadata={
                "error": "missing_current_task",
                "error_type": "MissingTaskFile",
                "execution_time_s": time.monotonic() - start_time,
            },
        )

    # ── Step 2: Determine prompt phase ──
    phase = self._determine_phase(state)

    # ── Step 3: Build SDK options ──
    sdk_options = self._build_sdk_options(project_root, constraints)

    # ── Step 4: Snapshot pre-execution state ──
    pre_snapshot = _snapshot_repo_state(project_root)

    # ── Step 5: Build prompt and execute ──
    prompt = self.templates.render(phase=phase, task=task, state=state)

    try:
        agent_output = await self._run_agent(sdk_options, prompt)
    except Exception as exc:
        logger.error("Agent execution failed: %s", exc)
        post_snapshot = _snapshot_repo_state(project_root)
        changes = _detect_changes(pre_snapshot, post_snapshot, project_root)
        return TaskResult(
            task_id=task.id,
            status="error",
            artifacts=[Path(f) for f in changes["modified_files"]],
            commits=changes["new_commits"],
            metadata={
                "error": str(exc),
                "error_type": type(exc).__name__,
                "execution_time_s": time.monotonic() - start_time,
                "partial_changes": bool(changes["new_commits"]),
            },
        )

    # ── Step 6: Detect changes ──
    post_snapshot = _snapshot_repo_state(project_root)
    changes = _detect_changes(pre_snapshot, post_snapshot, project_root)

    # ── Step 7: Parse and return TaskResult ──
    return self._parse_result(
        task_id=task.id,
        agent_output=agent_output,
        changes=changes,
        execution_time_s=time.monotonic() - start_time,
    )

BranchPolicyConstraint

Validates branch naming convention and blocks commits to protected branches.

Returns BLOCK if the current branch is protected, or if a required_prefix is configured and the branch doesn't match.

Parameters:

Name Type Description Default
protected_branches list[str] | None

Branch names that must not receive direct commits (default: ["main", "master"]).

None
required_prefix str | None

If set, non-protected branches must start with this prefix (e.g. "task/" or "feature/").

None
Source code in src/rigger/constraints/branch_policy.py
class BranchPolicyConstraint:
    """Validates branch naming convention and blocks commits to protected branches.

    Returns ``BLOCK`` if the current branch is protected, or if a
    ``required_prefix`` is configured and the branch doesn't match.

    Args:
        protected_branches: Branch names that must not receive direct commits
            (default: ``["main", "master"]``).
        required_prefix: If set, non-protected branches must start with this
            prefix (e.g. ``"task/"`` or ``"feature/"``).
    """

    def __init__(
        self,
        protected_branches: list[str] | None = None,
        required_prefix: str | None = None,
    ) -> None:
        self._protected = (
            list(protected_branches)
            if protected_branches is not None
            else list(_DEFAULT_PROTECTED)
        )
        self._required_prefix = required_prefix

    def check(self, project_root: Path) -> VerifyResult:
        """Check the current git branch against the policy."""
        branch = self._get_current_branch(project_root)
        if branch is None:
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message="Could not determine current git branch.",
                details={"project_root": str(project_root)},
            )

        if branch in self._protected:
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message=(
                    f"Direct commits to protected branch '{branch}' are not allowed."
                ),
                details={
                    "branch": branch,
                    "protected_branches": self._protected,
                },
            )

        if self._required_prefix and not branch.startswith(self._required_prefix):
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message=(
                    f"Branch '{branch}' does not match required prefix "
                    f"'{self._required_prefix}'."
                ),
                details={
                    "branch": branch,
                    "required_prefix": self._required_prefix,
                },
            )

        return VerifyResult(
            passed=True,
            message=f"Branch '{branch}' satisfies policy.",
            details={"branch": branch},
        )

    @staticmethod
    def _get_current_branch(project_root: Path) -> str | None:
        """Read the current git branch name via ``git rev-parse``."""
        try:
            proc = subprocess.run(
                ["git", "rev-parse", "--abbrev-ref", "HEAD"],
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=10,
            )
        except (subprocess.TimeoutExpired, FileNotFoundError):
            return None

        if proc.returncode != 0:
            return None

        return proc.stdout.strip() or None

check(project_root)

Check the current git branch against the policy.

Source code in src/rigger/constraints/branch_policy.py
def check(self, project_root: Path) -> VerifyResult:
    """Check the current git branch against the policy."""
    branch = self._get_current_branch(project_root)
    if branch is None:
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message="Could not determine current git branch.",
            details={"project_root": str(project_root)},
        )

    if branch in self._protected:
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message=(
                f"Direct commits to protected branch '{branch}' are not allowed."
            ),
            details={
                "branch": branch,
                "protected_branches": self._protected,
            },
        )

    if self._required_prefix and not branch.startswith(self._required_prefix):
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message=(
                f"Branch '{branch}' does not match required prefix "
                f"'{self._required_prefix}'."
            ),
            details={
                "branch": branch,
                "required_prefix": self._required_prefix,
            },
        )

    return VerifyResult(
        passed=True,
        message=f"Branch '{branch}' satisfies policy.",
        details={"branch": branch},
    )

ToolAllowlistConstraint

Emits allowed_tools and/or disallowed_tools metadata.

This constraint always passes — it is metadata-emitting, not blocking. The emitted metadata flows through the merge algorithm (Task 1.26) into .harness/constraints.json for the backend to read.

Parameters:

Name Type Description Default
allowed list[str] | None

Tool names/patterns the agent MAY use. Empty means no opinion.

None
disallowed list[str] | None

Tool names/patterns the agent MUST NOT use. Empty means no opinion.

None
Source code in src/rigger/constraints/tool_allowlist.py
class ToolAllowlistConstraint:
    """Emits ``allowed_tools`` and/or ``disallowed_tools`` metadata.

    This constraint always passes — it is metadata-emitting, not blocking.
    The emitted metadata flows through the merge algorithm (Task 1.26) into
    ``.harness/constraints.json`` for the backend to read.

    Args:
        allowed: Tool names/patterns the agent MAY use. Empty means no opinion.
        disallowed: Tool names/patterns the agent MUST NOT use. Empty means
            no opinion.
    """

    def __init__(
        self,
        allowed: list[str] | None = None,
        disallowed: list[str] | None = None,
    ) -> None:
        self._allowed = list(allowed) if allowed else []
        self._disallowed = list(disallowed) if disallowed else []

    def check(self, project_root: Path) -> VerifyResult:
        """Return tool restriction metadata. Always passes."""
        metadata: dict[str, list[str]] = {}
        if self._allowed:
            metadata["allowed_tools"] = self._allowed
        if self._disallowed:
            metadata["disallowed_tools"] = self._disallowed
        return VerifyResult(passed=True, metadata=metadata)

check(project_root)

Return tool restriction metadata. Always passes.

Source code in src/rigger/constraints/tool_allowlist.py
def check(self, project_root: Path) -> VerifyResult:
    """Return tool restriction metadata. Always passes."""
    metadata: dict[str, list[str]] = {}
    if self._allowed:
        metadata["allowed_tools"] = self._allowed
    if self._disallowed:
        metadata["disallowed_tools"] = self._disallowed
    return VerifyResult(passed=True, metadata=metadata)

AgentsMdContextSource

Generates or updates an AGENTS.md file from a template.

The template supports variable substitution with {task}, {constraints}, and {context} placeholders. Unresolved placeholders are left as-is.

The template argument may be either an inline string or a path to a template file. If it starts with / or ends with .md / .txt, it is treated as a file path; otherwise as an inline template string.

Parameters:

Name Type Description Default
template str

Inline template string or path to a template file.

required
output str

Output filename relative to project root (default: "AGENTS.md").

'AGENTS.md'
variables dict[str, str] | None

Optional dict of variable substitutions beyond the standard ones.

None
Source code in src/rigger/context_sources/agents_md.py
class AgentsMdContextSource:
    """Generates or updates an AGENTS.md file from a template.

    The template supports variable substitution with ``{task}``,
    ``{constraints}``, and ``{context}`` placeholders. Unresolved
    placeholders are left as-is.

    The ``template`` argument may be either an inline string or a
    path to a template file. If it starts with ``/`` or ends with
    ``.md`` / ``.txt``, it is treated as a file path; otherwise as
    an inline template string.

    Args:
        template: Inline template string or path to a template file.
        output: Output filename relative to project root (default: ``"AGENTS.md"``).
        variables: Optional dict of variable substitutions beyond the standard ones.
    """

    def __init__(  # noqa: D107
        self,
        template: str,
        *,
        output: str = "AGENTS.md",
        variables: dict[str, str] | None = None,
    ) -> None:
        self._template = template
        self._output = output
        self._variables = variables or {}

    def gather(self, project_root: Path) -> ProvisionResult:
        """Write AGENTS.md to project root from template, return its path.

        Reads the template (from file or inline), applies variable
        substitution, and writes the output file. Returns the path
        in ``ProvisionResult.files``.
        """
        template_content = self._load_template(project_root)
        if template_content is None:
            return ProvisionResult()

        rendered = self._render(template_content)

        output_path = project_root / self._output
        output_path.parent.mkdir(parents=True, exist_ok=True)
        output_path.write_text(rendered, encoding="utf-8")

        return ProvisionResult(files=[output_path])

    def _load_template(self, project_root: Path) -> str | None:
        """Load template content from file path or return inline string."""
        if self._is_file_path(self._template):
            template_path = Path(self._template)
            if not template_path.is_absolute():
                template_path = project_root / template_path
            if not template_path.exists():
                logger.warning("Template file %s does not exist", template_path)
                return None
            return template_path.read_text(encoding="utf-8")
        return self._template

    def _render(self, content: str) -> str:
        """Apply variable substitution to template content.

        Standard variables (``{task}``, ``{constraints}``, ``{context}``)
        default to empty strings if not provided. Additional variables
        from the constructor are also applied.
        """
        defaults = {"task": "", "constraints": "", "context": ""}
        merged = {**defaults, **self._variables}
        for key, value in merged.items():
            content = content.replace(f"{{{key}}}", value)
        return content

    @staticmethod
    def _is_file_path(template: str) -> bool:
        """Heuristic: treat as file path if absolute or has a file extension."""
        return (
            template.startswith("/")
            or template.endswith(".md")
            or template.endswith(".txt")
        )

gather(project_root)

Write AGENTS.md to project root from template, return its path.

Reads the template (from file or inline), applies variable substitution, and writes the output file. Returns the path in ProvisionResult.files.

Source code in src/rigger/context_sources/agents_md.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Write AGENTS.md to project root from template, return its path.

    Reads the template (from file or inline), applies variable
    substitution, and writes the output file. Returns the path
    in ``ProvisionResult.files``.
    """
    template_content = self._load_template(project_root)
    if template_content is None:
        return ProvisionResult()

    rendered = self._render(template_content)

    output_path = project_root / self._output
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(rendered, encoding="utf-8")

    return ProvisionResult(files=[output_path])

FileTreeContextSource

Verifies a directory tree exists and reports its file paths as context.

Does NOT copy files — they are already in the project. This source validates presence and returns discovered paths as a ProvisionResult.

Parameters:

Name Type Description Default
root str

Relative path from project root to the directory tree.

required
required bool

If True (default), log a warning when the directory is missing.

True
Source code in src/rigger/context_sources/file_tree.py
class FileTreeContextSource:
    """Verifies a directory tree exists and reports its file paths as context.

    Does NOT copy files — they are already in the project. This source
    validates presence and returns discovered paths as a ``ProvisionResult``.

    Args:
        root: Relative path from project root to the directory tree.
        required: If True (default), log a warning when the directory is missing.
    """

    def __init__(self, root: str, *, required: bool = True) -> None:  # noqa: D107
        self._root = root
        self._required = required

    def gather(self, project_root: Path) -> ProvisionResult:
        """Return paths to all files under the configured root directory.

        If the directory does not exist and ``required`` is True, logs a
        warning and returns an empty result. Non-required missing directories
        return silently.
        """
        tree_path = project_root / self._root

        if not tree_path.exists():
            if self._required:
                logger.warning(
                    "Required directory %s does not exist in %s",
                    self._root,
                    project_root,
                )
            return ProvisionResult()

        if not tree_path.is_dir():
            logger.warning(
                "Path %s exists but is not a directory in %s",
                self._root,
                project_root,
            )
            return ProvisionResult()

        files = sorted(p for p in tree_path.rglob("*") if p.is_file())
        return ProvisionResult(files=files)

gather(project_root)

Return paths to all files under the configured root directory.

If the directory does not exist and required is True, logs a warning and returns an empty result. Non-required missing directories return silently.

Source code in src/rigger/context_sources/file_tree.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Return paths to all files under the configured root directory.

    If the directory does not exist and ``required`` is True, logs a
    warning and returns an empty result. Non-required missing directories
    return silently.
    """
    tree_path = project_root / self._root

    if not tree_path.exists():
        if self._required:
            logger.warning(
                "Required directory %s does not exist in %s",
                self._root,
                project_root,
            )
        return ProvisionResult()

    if not tree_path.is_dir():
        logger.warning(
            "Path %s exists but is not a directory in %s",
            self._root,
            project_root,
        )
        return ProvisionResult()

    files = sorted(p for p in tree_path.rglob("*") if p.is_file())
    return ProvisionResult(files=files)

McpCapabilityContextSource

Reports MCP tool/server availability as a capability description.

This is a registration stub — the MCP server is configured at SDK client creation time. gather() writes a capabilities file and returns a capability string for observability.

Parameters:

Name Type Description Default
name str

MCP server name (e.g. "linear", "puppeteer").

required
description str

Human-readable capability description.

''
output str

Output path relative to project root (default: ".harness/capabilities.md").

'.harness/capabilities.md'
Source code in src/rigger/context_sources/mcp_capability.py
class McpCapabilityContextSource:
    """Reports MCP tool/server availability as a capability description.

    This is a registration stub — the MCP server is configured at SDK
    client creation time. ``gather()`` writes a capabilities file and
    returns a capability string for observability.

    Args:
        name: MCP server name (e.g. ``"linear"``, ``"puppeteer"``).
        description: Human-readable capability description.
        output: Output path relative to project root
            (default: ``".harness/capabilities.md"``).
    """

    def __init__(  # noqa: D107
        self,
        name: str,
        *,
        description: str = "",
        output: str = ".harness/capabilities.md",
    ) -> None:
        self._name = name
        self._description = description or f"MCP server: {name}"
        self._output = output

    def gather(self, project_root: Path) -> ProvisionResult:
        """Write capabilities file and return path + capability string.

        Creates or appends to the capabilities file at the configured
        output path. Returns the file path and a capability description
        for observability.
        """
        output_path = project_root / self._output
        output_path.parent.mkdir(parents=True, exist_ok=True)

        entry = f"- **{self._name}**: {self._description}\n"

        if output_path.exists():
            existing = output_path.read_text(encoding="utf-8")
            if entry not in existing:
                output_path.write_text(existing + entry, encoding="utf-8")
        else:
            header = "# MCP Capabilities\n\n"
            output_path.write_text(header + entry, encoding="utf-8")

        return ProvisionResult(
            files=[output_path],
            capabilities=[self._description],
        )

gather(project_root)

Write capabilities file and return path + capability string.

Creates or appends to the capabilities file at the configured output path. Returns the file path and a capability description for observability.

Source code in src/rigger/context_sources/mcp_capability.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Write capabilities file and return path + capability string.

    Creates or appends to the capabilities file at the configured
    output path. Returns the file path and a capability description
    for observability.
    """
    output_path = project_root / self._output
    output_path.parent.mkdir(parents=True, exist_ok=True)

    entry = f"- **{self._name}**: {self._description}\n"

    if output_path.exists():
        existing = output_path.read_text(encoding="utf-8")
        if entry not in existing:
            output_path.write_text(existing + entry, encoding="utf-8")
    else:
        header = "# MCP Capabilities\n\n"
        output_path.write_text(header + entry, encoding="utf-8")

    return ProvisionResult(
        files=[output_path],
        capabilities=[self._description],
    )

StaticFilesContextSource

Copies pre-computed files from a configured path into the project.

The source may be a single file or a directory. When the source is a directory, all files within it are copied recursively to the destination, preserving the relative directory structure.

Parameters:

Name Type Description Default
source str

Absolute or relative path to the source file or directory.

required
destination str

Relative path within the project root for output.

required
Source code in src/rigger/context_sources/static_files.py
class StaticFilesContextSource:
    """Copies pre-computed files from a configured path into the project.

    The source may be a single file or a directory. When the source is
    a directory, all files within it are copied recursively to the
    destination, preserving the relative directory structure.

    Args:
        source: Absolute or relative path to the source file or directory.
        destination: Relative path within the project root for output.
    """

    def __init__(self, source: str, destination: str) -> None:  # noqa: D107
        self._source = source
        self._destination = destination

    def gather(self, project_root: Path) -> ProvisionResult:
        """Copy source files to destination under project root, return paths.

        If the source does not exist, logs a warning and returns an
        empty result.
        """
        source_path = Path(self._source)
        if not source_path.is_absolute():
            source_path = project_root / source_path

        if not source_path.exists():
            logger.warning("Static files source %s does not exist", source_path)
            return ProvisionResult()

        dest_path = project_root / self._destination
        dest_path.parent.mkdir(parents=True, exist_ok=True)

        if source_path.is_file():
            return self._copy_file(source_path, dest_path)
        return self._copy_dir(source_path, dest_path)

    @staticmethod
    def _copy_file(source: Path, dest: Path) -> ProvisionResult:
        """Copy a single file to the destination."""
        shutil.copy2(source, dest)
        final = dest / source.name if dest.is_dir() else dest
        return ProvisionResult(files=[final])

    @staticmethod
    def _copy_dir(source: Path, dest: Path) -> ProvisionResult:
        """Copy all files from source directory to destination."""
        dest.mkdir(parents=True, exist_ok=True)
        files: list[Path] = []
        for src_file in sorted(source.rglob("*")):
            if not src_file.is_file():
                continue
            relative = src_file.relative_to(source)
            target = dest / relative
            target.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(src_file, target)
            files.append(target)
        return ProvisionResult(files=files)

gather(project_root)

Copy source files to destination under project root, return paths.

If the source does not exist, logs a warning and returns an empty result.

Source code in src/rigger/context_sources/static_files.py
def gather(self, project_root: Path) -> ProvisionResult:
    """Copy source files to destination under project root, return paths.

    If the source does not exist, logs a warning and returns an
    empty result.
    """
    source_path = Path(self._source)
    if not source_path.is_absolute():
        source_path = project_root / source_path

    if not source_path.exists():
        logger.warning("Static files source %s does not exist", source_path)
        return ProvisionResult()

    dest_path = project_root / self._destination
    dest_path.parent.mkdir(parents=True, exist_ok=True)

    if source_path.is_file():
        return self._copy_file(source_path, dest_path)
    return self._copy_dir(source_path, dest_path)

DocStalenessEntropyDetector

Checks file modification dates against an age threshold.

Scans files matching the configured glob patterns and returns a remediation Task for each file that hasn't been modified within max_age_days.

Parameters:

Name Type Description Default
patterns list[str]

Glob patterns to match (e.g. ["docs/**/*.md"]).

required
max_age_days int

Maximum age in days before a file is considered stale (default 30).

30
Source code in src/rigger/entropy_detectors/doc_staleness.py
class DocStalenessEntropyDetector:
    """Checks file modification dates against an age threshold.

    Scans files matching the configured glob patterns and returns a
    remediation ``Task`` for each file that hasn't been modified within
    ``max_age_days``.

    Args:
        patterns: Glob patterns to match (e.g. ``["docs/**/*.md"]``).
        max_age_days: Maximum age in days before a file is considered stale
            (default 30).
    """

    def __init__(
        self,
        patterns: list[str],
        max_age_days: int = 30,
    ) -> None:
        self._patterns = list(patterns)
        self._max_age_days = max_age_days

    def scan(self, project_root: Path) -> list[Task]:
        """Find files exceeding the staleness threshold."""
        threshold = time.time() - (self._max_age_days * _SECONDS_PER_DAY)
        tasks: list[Task] = []

        for pattern in self._patterns:
            for file_path in sorted(project_root.glob(pattern)):
                if not file_path.is_file():
                    continue
                try:
                    mtime = file_path.stat().st_mtime
                except OSError as exc:
                    logger.warning("Cannot stat %s: %s", file_path, exc)
                    continue

                if mtime < threshold:
                    age_days = int((time.time() - mtime) / _SECONDS_PER_DAY)
                    last_modified = datetime.fromtimestamp(mtime, tz=UTC).strftime(
                        "%Y-%m-%d"
                    )
                    relative = file_path.relative_to(project_root)

                    tasks.append(
                        Task(
                            id=f"stale-{relative}",
                            description=(
                                f"Update {relative} "
                                f"(last modified {last_modified}, "
                                f"{age_days} days ago)"
                            ),
                            metadata={
                                "source": "entropy_scan",
                                "file": str(relative),
                                "last_modified": last_modified,
                                "age_days": age_days,
                            },
                        )
                    )

        return tasks

scan(project_root)

Find files exceeding the staleness threshold.

Source code in src/rigger/entropy_detectors/doc_staleness.py
def scan(self, project_root: Path) -> list[Task]:
    """Find files exceeding the staleness threshold."""
    threshold = time.time() - (self._max_age_days * _SECONDS_PER_DAY)
    tasks: list[Task] = []

    for pattern in self._patterns:
        for file_path in sorted(project_root.glob(pattern)):
            if not file_path.is_file():
                continue
            try:
                mtime = file_path.stat().st_mtime
            except OSError as exc:
                logger.warning("Cannot stat %s: %s", file_path, exc)
                continue

            if mtime < threshold:
                age_days = int((time.time() - mtime) / _SECONDS_PER_DAY)
                last_modified = datetime.fromtimestamp(mtime, tz=UTC).strftime(
                    "%Y-%m-%d"
                )
                relative = file_path.relative_to(project_root)

                tasks.append(
                    Task(
                        id=f"stale-{relative}",
                        description=(
                            f"Update {relative} "
                            f"(last modified {last_modified}, "
                            f"{age_days} days ago)"
                        ),
                        metadata={
                            "source": "entropy_scan",
                            "file": str(relative),
                            "last_modified": last_modified,
                            "age_days": age_days,
                        },
                    )
                )

    return tasks

ShellCommandEntropyDetector

Runs an arbitrary shell command and parses JSON stdout as tasks.

The command should output a JSON array of objects, each with at least a description field. An optional priority field is stored in task metadata.

Expected JSON format::

[{"description": "Fix stale docs", "priority": "high"}, ...]

Parameters:

Name Type Description Default
command str

Shell command to execute.

required
timeout int

Maximum seconds to wait for the command (default 120).

120
Source code in src/rigger/entropy_detectors/shell_command.py
class ShellCommandEntropyDetector:
    """Runs an arbitrary shell command and parses JSON stdout as tasks.

    The command should output a JSON array of objects, each with at least
    a ``description`` field. An optional ``priority`` field is stored in
    task metadata.

    Expected JSON format::

        [{"description": "Fix stale docs", "priority": "high"}, ...]

    Args:
        command: Shell command to execute.
        timeout: Maximum seconds to wait for the command (default 120).
    """

    def __init__(self, command: str, timeout: int = 120) -> None:
        self._command = command
        self._timeout = timeout

    def scan(self, project_root: Path) -> list[Task]:
        """Run the command and parse JSON output into tasks."""
        try:
            result = subprocess.run(
                self._command,
                shell=True,
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=self._timeout,
            )
        except subprocess.TimeoutExpired:
            logger.warning(
                "Entropy command timed out after %ds: %s",
                self._timeout,
                self._command,
            )
            return []

        if result.returncode != 0:
            logger.warning(
                "Entropy command exited with code %d: %s\nstderr: %s",
                result.returncode,
                self._command,
                result.stderr.strip(),
            )
            return []

        return self._parse_output(result.stdout)

    def _parse_output(self, stdout: str) -> list[Task]:
        """Parse JSON array from stdout into Task objects."""
        stdout = stdout.strip()
        if not stdout:
            return []

        try:
            data = json.loads(stdout)
        except json.JSONDecodeError as exc:
            logger.warning("Malformed JSON from entropy command: %s", exc)
            return []

        if not isinstance(data, list):
            logger.warning(
                "Entropy command output is %s, expected JSON array",
                type(data).__name__,
            )
            return []

        tasks: list[Task] = []
        for i, item in enumerate(data):
            if not isinstance(item, dict):
                logger.warning("Skipping non-object item at index %d", i)
                continue
            description = item.get("description")
            if not description:
                logger.warning("Skipping item at index %d: missing description", i)
                continue

            metadata: dict[str, str] = {"source": "entropy_scan"}
            if "priority" in item:
                metadata["priority"] = str(item["priority"])

            tasks.append(
                Task(
                    id=f"entropy-{i}",
                    description=str(description),
                    metadata=metadata,
                )
            )

        return tasks

scan(project_root)

Run the command and parse JSON output into tasks.

Source code in src/rigger/entropy_detectors/shell_command.py
def scan(self, project_root: Path) -> list[Task]:
    """Run the command and parse JSON output into tasks."""
    try:
        result = subprocess.run(
            self._command,
            shell=True,
            cwd=project_root,
            capture_output=True,
            text=True,
            timeout=self._timeout,
        )
    except subprocess.TimeoutExpired:
        logger.warning(
            "Entropy command timed out after %ds: %s",
            self._timeout,
            self._command,
        )
        return []

    if result.returncode != 0:
        logger.warning(
            "Entropy command exited with code %d: %s\nstderr: %s",
            result.returncode,
            self._command,
            result.stderr.strip(),
        )
        return []

    return self._parse_output(result.stdout)

HarnessDirStateStore

Persists EpochState via .harness/state.json.

Thin wrapper around rigger._schema.read_state() and write_state(). This is functionally equivalent to the default behavior when no StateStore is configured, but exists as an explicit class for the declarative registry.

Source code in src/rigger/state_stores/harness_dir.py
class HarnessDirStateStore:
    """Persists ``EpochState`` via ``.harness/state.json``.

    Thin wrapper around ``rigger._schema.read_state()`` and
    ``write_state()``. This is functionally equivalent to the
    default behavior when no StateStore is configured, but
    exists as an explicit class for the declarative registry.
    """

    def load(self, project_root: Path) -> EpochState:
        """Load state from ``.harness/state.json``."""
        return read_state(project_root)

    def save(self, project_root: Path, state: EpochState) -> None:
        """Write state to ``.harness/state.json``."""
        write_state(project_root, state)

load(project_root)

Load state from .harness/state.json.

Source code in src/rigger/state_stores/harness_dir.py
def load(self, project_root: Path) -> EpochState:
    """Load state from ``.harness/state.json``."""
    return read_state(project_root)

save(project_root, state)

Write state to .harness/state.json.

Source code in src/rigger/state_stores/harness_dir.py
def save(self, project_root: Path, state: EpochState) -> None:
    """Write state to ``.harness/state.json``."""
    write_state(project_root, state)

JsonFileStateStore

Persists EpochState to a user-specified JSON file.

The file path can be absolute or relative. Relative paths are resolved against project_root. Writes are atomic (temp file + os.replace).

Parameters:

Name Type Description Default
path str

Path to the JSON state file.

required
Source code in src/rigger/state_stores/json_file.py
class JsonFileStateStore:
    """Persists ``EpochState`` to a user-specified JSON file.

    The file path can be absolute or relative. Relative paths are resolved
    against ``project_root``. Writes are atomic (temp file + ``os.replace``).

    Args:
        path: Path to the JSON state file.
    """

    def __init__(self, path: str) -> None:
        self._path = path

    def _resolve(self, project_root: Path) -> Path:
        """Resolve the configured path against project_root."""
        p = Path(self._path)
        if p.is_absolute():
            return p
        return project_root / p

    def load(self, project_root: Path) -> EpochState:
        """Load state from the JSON file, returning defaults if missing."""
        file_path = self._resolve(project_root)
        if not file_path.exists():
            return EpochState()

        try:
            data = json.loads(file_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError, UnicodeDecodeError) as exc:
            logger.warning("Failed to read state from %s: %s", file_path, exc)
            return EpochState()

        if not isinstance(data, dict):
            logger.warning(
                "State file %s has unexpected type %s -- using defaults",
                file_path,
                type(data).__name__,
            )
            return EpochState()

        return self._parse_state(data)

    def save(self, project_root: Path, state: EpochState) -> None:
        """Atomically write state to the JSON file."""
        file_path = self._resolve(project_root)
        file_path.parent.mkdir(parents=True, exist_ok=True)

        data: dict[str, Any] = {
            "epoch": state.epoch,
            "completed_tasks": state.completed_tasks,
            "pending_tasks": state.pending_tasks,
            "halted": state.halted,
            "halt_reason": state.halt_reason,
            "metadata": state.metadata,
        }
        self._atomic_write(file_path, data)

    @staticmethod
    def _parse_state(data: dict[str, Any]) -> EpochState:
        """Parse a dict into EpochState, falling back to defaults on error."""
        try:
            return EpochState(
                epoch=data["epoch"],
                completed_tasks=data.get("completed_tasks", []),
                pending_tasks=data.get("pending_tasks", []),
                halted=data.get("halted", False),
                halt_reason=data.get("halt_reason", ""),
                metadata=data.get("metadata", {}),
            )
        except (KeyError, TypeError) as exc:
            logger.warning("Malformed state data: %s -- using defaults", exc)
            return EpochState()

    @staticmethod
    def _atomic_write(file_path: Path, data: dict[str, Any]) -> None:
        """Write JSON atomically via temp file + os.replace()."""
        content = json.dumps(data, indent=2).encode()
        fd, tmp_path = tempfile.mkstemp(
            dir=file_path.parent, prefix=".tmp_", suffix=".json"
        )
        try:
            os.write(fd, content)
            os.fsync(fd)
            os.close(fd)
            fd = -1
            os.replace(tmp_path, file_path)
        except BaseException:
            if fd >= 0:
                os.close(fd)
            with contextlib.suppress(OSError):
                os.unlink(tmp_path)
            raise

load(project_root)

Load state from the JSON file, returning defaults if missing.

Source code in src/rigger/state_stores/json_file.py
def load(self, project_root: Path) -> EpochState:
    """Load state from the JSON file, returning defaults if missing."""
    file_path = self._resolve(project_root)
    if not file_path.exists():
        return EpochState()

    try:
        data = json.loads(file_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError, UnicodeDecodeError) as exc:
        logger.warning("Failed to read state from %s: %s", file_path, exc)
        return EpochState()

    if not isinstance(data, dict):
        logger.warning(
            "State file %s has unexpected type %s -- using defaults",
            file_path,
            type(data).__name__,
        )
        return EpochState()

    return self._parse_state(data)

save(project_root, state)

Atomically write state to the JSON file.

Source code in src/rigger/state_stores/json_file.py
def save(self, project_root: Path, state: EpochState) -> None:
    """Atomically write state to the JSON file."""
    file_path = self._resolve(project_root)
    file_path.parent.mkdir(parents=True, exist_ok=True)

    data: dict[str, Any] = {
        "epoch": state.epoch,
        "completed_tasks": state.completed_tasks,
        "pending_tasks": state.pending_tasks,
        "halted": state.halted,
        "halt_reason": state.halt_reason,
        "metadata": state.metadata,
    }
    self._atomic_write(file_path, data)

AtomicIssueTaskSource

One issue = one task, no decomposition.

Fetches a single issue from GitHub or Linear and surfaces it as the sole pending task. mark_complete closes or transitions the issue.

Parameters:

Name Type Description Default
provider str

"github" or "linear".

required
issue_id str

Issue identifier. For GitHub: "owner/repo#123" or just "123" (requires repo param). For Linear: the issue UUID.

required
api_key_env str

Environment variable containing the API token.

''
repo str

GitHub repo in "owner/repo" format (optional, used when issue_id is a bare number).

''
Source code in src/rigger/task_sources/atomic_issue.py
class AtomicIssueTaskSource:
    """One issue = one task, no decomposition.

    Fetches a single issue from GitHub or Linear and surfaces it as the
    sole pending task. ``mark_complete`` closes or transitions the issue.

    Args:
        provider: ``"github"`` or ``"linear"``.
        issue_id: Issue identifier. For GitHub: ``"owner/repo#123"`` or
            just ``"123"`` (requires ``repo`` param). For Linear: the
            issue UUID.
        api_key_env: Environment variable containing the API token.
        repo: GitHub repo in ``"owner/repo"`` format (optional, used
            when ``issue_id`` is a bare number).
    """

    def __init__(  # noqa: D107
        self,
        provider: str,
        issue_id: str,
        api_key_env: str = "",
        repo: str = "",
    ) -> None:
        if provider not in ("github", "linear"):
            msg = f"Unsupported provider: {provider!r}. Use 'github' or 'linear'."
            raise ValueError(msg)
        self._provider = provider
        self._issue_id = issue_id
        self._api_key_env = api_key_env or (
            "GITHUB_TOKEN" if provider == "github" else "LINEAR_API_KEY"
        )
        self._repo = repo
        self._done = False

    def pending(self, project_root: Path) -> list[Task]:
        """Return the single issue as a Task, or empty if already done."""
        if self._done:
            return []

        api_key = os.environ.get(self._api_key_env, "")
        if not api_key:
            logger.warning("API key not found in env var %s", self._api_key_env)
            return []

        try:
            if self._provider == "github":
                return self._fetch_github(api_key)
            return self._fetch_linear(api_key)
        except (httpx.HTTPError, KeyError, ValueError) as exc:
            logger.warning("Failed to fetch issue %s: %s", self._issue_id, exc)
            return []

    def mark_complete(self, task_id: str, result: TaskResult) -> None:
        """Close the issue on the remote provider."""
        api_key = os.environ.get(self._api_key_env, "")
        if not api_key:
            logger.warning("API key not found in env var %s", self._api_key_env)
            self._done = True
            return

        try:
            if self._provider == "github":
                self._close_github(api_key)
            else:
                self._close_linear(api_key, task_id)
        except (httpx.HTTPError, KeyError, ValueError) as exc:
            logger.warning("Failed to close issue %s: %s", self._issue_id, exc)

        self._done = True

    # ── GitHub ──────────────────────────────────────────────────

    def _parse_github_ref(self) -> tuple[str, int]:
        """Parse owner/repo and issue number from issue_id."""
        if "#" in self._issue_id:
            repo_part, num_part = self._issue_id.rsplit("#", 1)
            return repo_part, int(num_part)
        if self._repo:
            return self._repo, int(self._issue_id)
        msg = (
            "GitHub issue_id must be 'owner/repo#123' or provide repo param. "
            f"Got: {self._issue_id!r}"
        )
        raise ValueError(msg)

    def _fetch_github(self, api_key: str) -> list[Task]:
        repo, number = self._parse_github_ref()
        with httpx.Client(timeout=30) as client:
            resp = client.get(
                f"https://api.github.com/repos/{repo}/issues/{number}",
                headers={
                    "Authorization": f"Bearer {api_key}",
                    "Accept": "application/vnd.github+json",
                    "X-GitHub-Api-Version": "2022-11-28",
                },
            )
            resp.raise_for_status()
            data: dict[str, Any] = resp.json()

        if data.get("state") == "closed":
            self._done = True
            return []

        return [
            Task(
                id=str(data["id"]),
                description=data.get("title", ""),
                metadata={
                    "provider": "github",
                    "repo": repo,
                    "number": number,
                    "body": data.get("body", ""),
                    "labels": [lbl["name"] for lbl in data.get("labels", [])],
                    "url": data.get("html_url", ""),
                },
            )
        ]

    def _close_github(self, api_key: str) -> None:
        repo, number = self._parse_github_ref()
        with httpx.Client(timeout=30) as client:
            resp = client.patch(
                f"https://api.github.com/repos/{repo}/issues/{number}",
                json={"state": "closed"},
                headers={
                    "Authorization": f"Bearer {api_key}",
                    "Accept": "application/vnd.github+json",
                    "X-GitHub-Api-Version": "2022-11-28",
                },
            )
            resp.raise_for_status()

    # ── Linear ──────────────────────────────────────────────────

    def _fetch_linear(self, api_key: str) -> list[Task]:
        query = """
        query($id: String!) {
          issue(id: $id) {
            id identifier title description priority
            state { name type }
          }
        }
        """
        data = self._linear_graphql(api_key, query, {"id": self._issue_id})
        issue = data.get("data", {}).get("issue")
        if not issue:
            return []

        state_type = issue.get("state", {}).get("type", "")
        if state_type in ("completed", "canceled"):
            self._done = True
            return []

        return [
            Task(
                id=issue["id"],
                description=issue.get("title", ""),
                metadata={
                    "provider": "linear",
                    "identifier": issue.get("identifier", ""),
                    "priority": issue.get("priority", 4),
                    "description_body": issue.get("description", ""),
                    "state": issue.get("state", {}).get("name", ""),
                },
            )
        ]

    def _close_linear(self, api_key: str, task_id: str) -> None:
        # Find the team's completed state
        # We use the issue's team info from the issue query
        state_query = """
        query($issueId: String!) {
          issue(id: $issueId) {
            team {
              states(filter: { type: { eq: "completed" } }) {
                nodes { id name }
              }
            }
          }
        }
        """
        data = self._linear_graphql(api_key, state_query, {"issueId": task_id})
        states = (
            data.get("data", {})
            .get("issue", {})
            .get("team", {})
            .get("states", {})
            .get("nodes", [])
        )
        if not states:
            logger.warning("No completed state found for Linear issue %s", task_id)
            return

        mutation = """
        mutation($issueId: String!, $stateId: String!) {
          issueUpdate(id: $issueId, input: { stateId: $stateId }) {
            success
          }
        }
        """
        self._linear_graphql(
            api_key, mutation, {"issueId": task_id, "stateId": states[0]["id"]}
        )

    @staticmethod
    def _linear_graphql(
        api_key: str, query: str, variables: dict[str, Any]
    ) -> dict[str, Any]:
        with httpx.Client(timeout=30) as client:
            resp = client.post(
                "https://api.linear.app/graphql",
                json={"query": query, "variables": variables},
                headers={
                    "Authorization": api_key,
                    "Content-Type": "application/json",
                },
            )
            resp.raise_for_status()
            body: dict[str, Any] = resp.json()
            if "errors" in body:
                msg = body["errors"][0].get("message", "Unknown GraphQL error")
                raise ValueError(msg)
            return body

pending(project_root)

Return the single issue as a Task, or empty if already done.

Source code in src/rigger/task_sources/atomic_issue.py
def pending(self, project_root: Path) -> list[Task]:
    """Return the single issue as a Task, or empty if already done."""
    if self._done:
        return []

    api_key = os.environ.get(self._api_key_env, "")
    if not api_key:
        logger.warning("API key not found in env var %s", self._api_key_env)
        return []

    try:
        if self._provider == "github":
            return self._fetch_github(api_key)
        return self._fetch_linear(api_key)
    except (httpx.HTTPError, KeyError, ValueError) as exc:
        logger.warning("Failed to fetch issue %s: %s", self._issue_id, exc)
        return []

mark_complete(task_id, result)

Close the issue on the remote provider.

Source code in src/rigger/task_sources/atomic_issue.py
def mark_complete(self, task_id: str, result: TaskResult) -> None:
    """Close the issue on the remote provider."""
    api_key = os.environ.get(self._api_key_env, "")
    if not api_key:
        logger.warning("API key not found in env var %s", self._api_key_env)
        self._done = True
        return

    try:
        if self._provider == "github":
            self._close_github(api_key)
        else:
            self._close_linear(api_key, task_id)
    except (httpx.HTTPError, KeyError, ValueError) as exc:
        logger.warning("Failed to close issue %s: %s", self._issue_id, exc)

    self._done = True

FileListTaskSource

Reads a JSON file with task objects; marks complete by updating status.

The JSON file must contain an array of objects, each with at least id and description fields. An optional status field tracks completion (defaults to "pending").

Example JSON::

[
    {"id": "t1", "description": "Add login page", "status": "pending"},
    {"id": "t2", "description": "Add tests", "status": "done"}
]

Parameters:

Name Type Description Default
path str

Path to the JSON file (relative to project_root or absolute).

required
Source code in src/rigger/task_sources/file_list.py
class FileListTaskSource:
    """Reads a JSON file with task objects; marks complete by updating status.

    The JSON file must contain an array of objects, each with at least
    ``id`` and ``description`` fields. An optional ``status`` field tracks
    completion (defaults to ``"pending"``).

    Example JSON::

        [
            {"id": "t1", "description": "Add login page", "status": "pending"},
            {"id": "t2", "description": "Add tests", "status": "done"}
        ]

    Args:
        path: Path to the JSON file (relative to project_root or absolute).
    """

    def __init__(self, path: str) -> None:  # noqa: D107
        self._path = path

    def pending(self, project_root: Path) -> list[Task]:
        """Return tasks with status != ``"done"``, in array order."""
        file_path = self._resolve(project_root)
        if not file_path.exists():
            return []

        try:
            items = json.loads(file_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning("Failed to read task file %s: %s", file_path, exc)
            return []

        if not isinstance(items, list):
            logger.warning("Task file %s does not contain a JSON array", file_path)
            return []

        tasks: list[Task] = []
        for item in items:
            if not isinstance(item, dict):
                continue
            if item.get("status", "pending") == "done":
                continue
            task_id = item.get("id", "")
            if not task_id:
                continue
            tasks.append(
                Task(
                    id=str(task_id),
                    description=item.get("description", ""),
                    metadata={
                        k: v
                        for k, v in item.items()
                        if k not in ("id", "description", "status")
                    },
                )
            )
        return tasks

    def mark_complete(self, task_id: str, result: TaskResult) -> None:
        """Update the task's status to ``"done"`` in the JSON file.

        Uses atomic write (write to temp file, then rename) to avoid
        partial writes on crash.
        """
        file_path = self._resolve_for_write()
        if not file_path.exists():
            logger.warning("Task file %s not found for mark_complete", file_path)
            return

        try:
            items = json.loads(file_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning("Failed to read task file %s: %s", file_path, exc)
            return

        if not isinstance(items, list):
            return

        for item in items:
            if isinstance(item, dict) and str(item.get("id", "")) == task_id:
                item["status"] = "done"
                break

        self._atomic_write(file_path, items)

    def _resolve(self, project_root: Path) -> Path:
        p = Path(self._path)
        if p.is_absolute():
            return p
        return project_root / p

    def _resolve_for_write(self) -> Path:
        """Resolve path for write operations.

        mark_complete doesn't receive project_root, so the path must be
        absolute or we fall back to CWD. In normal harness usage, the path
        is resolved during pending() and tasks are marked complete in the
        same working directory.
        """
        p = Path(self._path)
        if p.is_absolute():
            return p
        return Path.cwd() / p

    @staticmethod
    def _atomic_write(file_path: Path, data: list[dict[str, object]]) -> None:
        """Write JSON data atomically via temp file + rename."""
        content = json.dumps(data, indent=2, ensure_ascii=False) + "\n"
        fd, tmp_path = tempfile.mkstemp(
            dir=file_path.parent, suffix=".tmp", prefix=".task_"
        )
        try:
            os.write(fd, content.encode("utf-8"))
            os.close(fd)
            os.replace(tmp_path, file_path)
        except BaseException:
            with contextlib.suppress(OSError):
                os.close(fd)
            Path(tmp_path).unlink(missing_ok=True)
            raise

pending(project_root)

Return tasks with status != "done", in array order.

Source code in src/rigger/task_sources/file_list.py
def pending(self, project_root: Path) -> list[Task]:
    """Return tasks with status != ``"done"``, in array order."""
    file_path = self._resolve(project_root)
    if not file_path.exists():
        return []

    try:
        items = json.loads(file_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError) as exc:
        logger.warning("Failed to read task file %s: %s", file_path, exc)
        return []

    if not isinstance(items, list):
        logger.warning("Task file %s does not contain a JSON array", file_path)
        return []

    tasks: list[Task] = []
    for item in items:
        if not isinstance(item, dict):
            continue
        if item.get("status", "pending") == "done":
            continue
        task_id = item.get("id", "")
        if not task_id:
            continue
        tasks.append(
            Task(
                id=str(task_id),
                description=item.get("description", ""),
                metadata={
                    k: v
                    for k, v in item.items()
                    if k not in ("id", "description", "status")
                },
            )
        )
    return tasks

mark_complete(task_id, result)

Update the task's status to "done" in the JSON file.

Uses atomic write (write to temp file, then rename) to avoid partial writes on crash.

Source code in src/rigger/task_sources/file_list.py
def mark_complete(self, task_id: str, result: TaskResult) -> None:
    """Update the task's status to ``"done"`` in the JSON file.

    Uses atomic write (write to temp file, then rename) to avoid
    partial writes on crash.
    """
    file_path = self._resolve_for_write()
    if not file_path.exists():
        logger.warning("Task file %s not found for mark_complete", file_path)
        return

    try:
        items = json.loads(file_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError) as exc:
        logger.warning("Failed to read task file %s: %s", file_path, exc)
        return

    if not isinstance(items, list):
        return

    for item in items:
        if isinstance(item, dict) and str(item.get("id", "")) == task_id:
            item["status"] = "done"
            break

    self._atomic_write(file_path, items)

JsonStoriesTaskSource

Reads a PRD-style JSON file with stories, one per cycle.

The JSON file must contain an object with a key (default "stories") that maps to an array of story objects. Each story needs at least id and description fields. An optional status field tracks completion (defaults to "pending").

Example JSON::

{
    "stories": [
        {"id": "s1", "description": "User can sign up", "status": "pending"},
        {"id": "s2", "description": "User can log in"}
    ]
}

Parameters:

Name Type Description Default
path str

Path to the JSON file (relative to project_root or absolute).

required
key str

JSON key containing the stories array.

'stories'
Source code in src/rigger/task_sources/json_stories.py
class JsonStoriesTaskSource:
    """Reads a PRD-style JSON file with stories, one per cycle.

    The JSON file must contain an object with a key (default ``"stories"``)
    that maps to an array of story objects. Each story needs at least
    ``id`` and ``description`` fields. An optional ``status`` field
    tracks completion (defaults to ``"pending"``).

    Example JSON::

        {
            "stories": [
                {"id": "s1", "description": "User can sign up", "status": "pending"},
                {"id": "s2", "description": "User can log in"}
            ]
        }

    Args:
        path: Path to the JSON file (relative to project_root or absolute).
        key: JSON key containing the stories array.
    """

    def __init__(self, path: str, key: str = "stories") -> None:  # noqa: D107
        self._path = path
        self._key = key

    def pending(self, project_root: Path) -> list[Task]:
        """Return incomplete stories in order, one per cycle."""
        file_path = self._resolve(project_root)
        if not file_path.exists():
            return []

        try:
            data = json.loads(file_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning("Failed to read stories file %s: %s", file_path, exc)
            return []

        if not isinstance(data, dict):
            logger.warning("Stories file %s does not contain a JSON object", file_path)
            return []

        stories = data.get(self._key, [])
        if not isinstance(stories, list):
            logger.warning(
                "Key %r in %s does not contain an array", self._key, file_path
            )
            return []

        for story in stories:
            if not isinstance(story, dict):
                continue
            status = story.get("status", "pending")
            if status == "done":
                continue
            story_id = story.get("id", "")
            if not story_id:
                continue
            # One per cycle
            return [
                Task(
                    id=str(story_id),
                    description=story.get("description", story.get("story", "")),
                    metadata={
                        k: v
                        for k, v in story.items()
                        if k not in ("id", "description", "story", "status")
                    },
                )
            ]

        return []

    def mark_complete(self, task_id: str, result: TaskResult) -> None:
        """Mark the story as completed in the JSON file.

        Uses atomic write (write to temp file, then rename).
        """
        file_path = self._resolve_for_write()
        if not file_path.exists():
            logger.warning("Stories file %s not found for mark_complete", file_path)
            return

        try:
            data = json.loads(file_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning("Failed to read stories file %s: %s", file_path, exc)
            return

        if not isinstance(data, dict):
            return

        stories = data.get(self._key, [])
        if not isinstance(stories, list):
            return

        for story in stories:
            if isinstance(story, dict) and str(story.get("id", "")) == task_id:
                story["status"] = "done"
                break

        self._atomic_write(file_path, data)

    def _resolve(self, project_root: Path) -> Path:
        p = Path(self._path)
        if p.is_absolute():
            return p
        return project_root / p

    def _resolve_for_write(self) -> Path:
        p = Path(self._path)
        if p.is_absolute():
            return p
        return Path.cwd() / p

    @staticmethod
    def _atomic_write(file_path: Path, data: dict[str, object]) -> None:
        """Write JSON data atomically via temp file + rename."""
        content = json.dumps(data, indent=2, ensure_ascii=False) + "\n"
        fd, tmp_path = tempfile.mkstemp(
            dir=file_path.parent, suffix=".tmp", prefix=".story_"
        )
        try:
            os.write(fd, content.encode("utf-8"))
            os.close(fd)
            os.replace(tmp_path, file_path)
        except BaseException:
            with contextlib.suppress(OSError):
                os.close(fd)
            Path(tmp_path).unlink(missing_ok=True)
            raise

pending(project_root)

Return incomplete stories in order, one per cycle.

Source code in src/rigger/task_sources/json_stories.py
def pending(self, project_root: Path) -> list[Task]:
    """Return incomplete stories in order, one per cycle."""
    file_path = self._resolve(project_root)
    if not file_path.exists():
        return []

    try:
        data = json.loads(file_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError) as exc:
        logger.warning("Failed to read stories file %s: %s", file_path, exc)
        return []

    if not isinstance(data, dict):
        logger.warning("Stories file %s does not contain a JSON object", file_path)
        return []

    stories = data.get(self._key, [])
    if not isinstance(stories, list):
        logger.warning(
            "Key %r in %s does not contain an array", self._key, file_path
        )
        return []

    for story in stories:
        if not isinstance(story, dict):
            continue
        status = story.get("status", "pending")
        if status == "done":
            continue
        story_id = story.get("id", "")
        if not story_id:
            continue
        # One per cycle
        return [
            Task(
                id=str(story_id),
                description=story.get("description", story.get("story", "")),
                metadata={
                    k: v
                    for k, v in story.items()
                    if k not in ("id", "description", "story", "status")
                },
            )
        ]

    return []

mark_complete(task_id, result)

Mark the story as completed in the JSON file.

Uses atomic write (write to temp file, then rename).

Source code in src/rigger/task_sources/json_stories.py
def mark_complete(self, task_id: str, result: TaskResult) -> None:
    """Mark the story as completed in the JSON file.

    Uses atomic write (write to temp file, then rename).
    """
    file_path = self._resolve_for_write()
    if not file_path.exists():
        logger.warning("Stories file %s not found for mark_complete", file_path)
        return

    try:
        data = json.loads(file_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError) as exc:
        logger.warning("Failed to read stories file %s: %s", file_path, exc)
        return

    if not isinstance(data, dict):
        return

    stories = data.get(self._key, [])
    if not isinstance(stories, list):
        return

    for story in stories:
        if isinstance(story, dict) and str(story.get("id", "")) == task_id:
            story["status"] = "done"
            break

    self._atomic_write(file_path, data)

LinearTaskSource

Queries Linear API for issues matching filters, ordered by priority.

Uses httpx for API calls (no Linear SDK dependency). The API key is read from an environment variable at call time.

Parameters:

Name Type Description Default
team str

Linear team key (e.g., "ENG").

required
project str | None

Optional project name filter.

None
labels list[str] | None

Optional label name filter.

None
api_key_env str

Environment variable containing the Linear API key.

'LINEAR_API_KEY'
Source code in src/rigger/task_sources/linear.py
class LinearTaskSource:
    """Queries Linear API for issues matching filters, ordered by priority.

    Uses httpx for API calls (no Linear SDK dependency). The API key is
    read from an environment variable at call time.

    Args:
        team: Linear team key (e.g., ``"ENG"``).
        project: Optional project name filter.
        labels: Optional label name filter.
        api_key_env: Environment variable containing the Linear API key.
    """

    def __init__(  # noqa: D107
        self,
        team: str,
        project: str | None = None,
        labels: list[str] | None = None,
        api_key_env: str = "LINEAR_API_KEY",
    ) -> None:
        self._team = team
        self._project = project
        self._labels = labels
        self._api_key_env = api_key_env

    def pending(self, project_root: Path) -> list[Task]:
        """Return Linear issues ordered by priority (urgent first)."""
        api_key = os.environ.get(self._api_key_env, "")
        if not api_key:
            logger.warning("Linear API key not found in env var %s", self._api_key_env)
            return []

        variables: dict[str, Any] = {"teamKey": self._team}
        if self._project:
            variables["projectName"] = self._project
        if self._labels:
            variables["labels"] = self._labels

        try:
            data = self._graphql(api_key, _ISSUES_QUERY, variables)
        except (httpx.HTTPError, KeyError, ValueError) as exc:
            logger.warning("Linear API request failed: %s", exc)
            return []

        nodes = data.get("data", {}).get("team", {}).get("issues", {}).get("nodes", [])

        return [
            Task(
                id=issue["id"],
                description=issue.get("title", ""),
                metadata={
                    "identifier": issue.get("identifier", ""),
                    "priority": issue.get("priority", 4),
                    "description_body": issue.get("description", ""),
                    "state": issue.get("state", {}).get("name", ""),
                    "labels": [
                        lbl["name"] for lbl in issue.get("labels", {}).get("nodes", [])
                    ],
                },
            )
            for issue in nodes
        ]

    def mark_complete(self, task_id: str, result: TaskResult) -> None:
        """Transition issue to the team's 'Done' state."""
        api_key = os.environ.get(self._api_key_env, "")
        if not api_key:
            logger.warning("Linear API key not found in env var %s", self._api_key_env)
            return

        try:
            done_state_id = self._get_done_state_id(api_key)
            if not done_state_id:
                logger.warning("No completed state found for team %s", self._team)
                return
            self._graphql(
                api_key,
                _TRANSITION_QUERY,
                {"issueId": task_id, "stateId": done_state_id},
            )
        except (httpx.HTTPError, KeyError, ValueError) as exc:
            logger.warning("Failed to mark Linear issue %s as done: %s", task_id, exc)

    def _get_done_state_id(self, api_key: str) -> str | None:
        """Fetch the first completed-type state ID for the team."""
        data = self._graphql(api_key, _DONE_STATE_QUERY, {"teamKey": self._team})
        states = data.get("data", {}).get("team", {}).get("states", {}).get("nodes", [])
        if states:
            return states[0]["id"]
        return None

    @staticmethod
    def _graphql(api_key: str, query: str, variables: dict[str, Any]) -> dict[str, Any]:
        """Execute a GraphQL request against the Linear API."""
        with httpx.Client(timeout=30) as client:
            resp = client.post(
                _GRAPHQL_ENDPOINT,
                json={"query": query, "variables": variables},
                headers={
                    "Authorization": api_key,
                    "Content-Type": "application/json",
                },
            )
            resp.raise_for_status()
            body: dict[str, Any] = resp.json()
            if "errors" in body:
                msg = body["errors"][0].get("message", "Unknown GraphQL error")
                raise ValueError(msg)
            return body

pending(project_root)

Return Linear issues ordered by priority (urgent first).

Source code in src/rigger/task_sources/linear.py
def pending(self, project_root: Path) -> list[Task]:
    """Return Linear issues ordered by priority (urgent first)."""
    api_key = os.environ.get(self._api_key_env, "")
    if not api_key:
        logger.warning("Linear API key not found in env var %s", self._api_key_env)
        return []

    variables: dict[str, Any] = {"teamKey": self._team}
    if self._project:
        variables["projectName"] = self._project
    if self._labels:
        variables["labels"] = self._labels

    try:
        data = self._graphql(api_key, _ISSUES_QUERY, variables)
    except (httpx.HTTPError, KeyError, ValueError) as exc:
        logger.warning("Linear API request failed: %s", exc)
        return []

    nodes = data.get("data", {}).get("team", {}).get("issues", {}).get("nodes", [])

    return [
        Task(
            id=issue["id"],
            description=issue.get("title", ""),
            metadata={
                "identifier": issue.get("identifier", ""),
                "priority": issue.get("priority", 4),
                "description_body": issue.get("description", ""),
                "state": issue.get("state", {}).get("name", ""),
                "labels": [
                    lbl["name"] for lbl in issue.get("labels", {}).get("nodes", [])
                ],
            },
        )
        for issue in nodes
    ]

mark_complete(task_id, result)

Transition issue to the team's 'Done' state.

Source code in src/rigger/task_sources/linear.py
def mark_complete(self, task_id: str, result: TaskResult) -> None:
    """Transition issue to the team's 'Done' state."""
    api_key = os.environ.get(self._api_key_env, "")
    if not api_key:
        logger.warning("Linear API key not found in env var %s", self._api_key_env)
        return

    try:
        done_state_id = self._get_done_state_id(api_key)
        if not done_state_id:
            logger.warning("No completed state found for team %s", self._team)
            return
        self._graphql(
            api_key,
            _TRANSITION_QUERY,
            {"issueId": task_id, "stateId": done_state_id},
        )
    except (httpx.HTTPError, KeyError, ValueError) as exc:
        logger.warning("Failed to mark Linear issue %s as done: %s", task_id, exc)

CiStatusVerifier

Checks GitHub Actions / CI pipeline status via gh CLI.

Polls CI status for the current (or configured) branch. All checks passed maps to ACCEPT, failed maps to action_on_fail, pending triggers a polling loop with configurable backoff.

Parameters:

Name Type Description Default
branch str | None

Branch to check (default: infer from current HEAD).

None
timeout int

Maximum seconds to poll (default 600).

600
poll_interval int

Seconds between polls (default 30).

30
action_on_fail str

VerifyAction when CI fails (default RETRY).

'retry'
Source code in src/rigger/verifiers/ci_status.py
class CiStatusVerifier:
    """Checks GitHub Actions / CI pipeline status via ``gh`` CLI.

    Polls CI status for the current (or configured) branch. All checks
    passed maps to ACCEPT, failed maps to ``action_on_fail``, pending
    triggers a polling loop with configurable backoff.

    Args:
        branch: Branch to check (default: infer from current HEAD).
        timeout: Maximum seconds to poll (default 600).
        poll_interval: Seconds between polls (default 30).
        action_on_fail: VerifyAction when CI fails (default RETRY).
    """

    def __init__(
        self,
        branch: str | None = None,
        timeout: int = 600,
        poll_interval: int = 30,
        action_on_fail: str = "retry",
    ) -> None:
        self._branch = branch
        self._timeout = timeout
        self._poll_interval = poll_interval
        self._action_on_fail = VerifyAction(action_on_fail)

    def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
        """Poll CI status until terminal state or timeout."""
        branch = self._branch or self._current_branch(project_root)
        if not branch:
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message="Could not determine current branch.",
            )

        deadline = time.monotonic() + self._timeout

        while True:
            status = self._check_status(project_root, branch)

            if status == "success":
                return VerifyResult(
                    passed=True,
                    message=f"All CI checks passed on branch '{branch}'.",
                    details={"branch": branch, "ci_status": "success"},
                )

            if status == "failure":
                logs = self._get_failure_logs(project_root, branch)
                return VerifyResult(
                    passed=False,
                    action=self._action_on_fail,
                    message=f"CI failed on branch '{branch}'.\n{logs}",
                    details={
                        "branch": branch,
                        "ci_status": "failure",
                        "logs": logs,
                    },
                )

            # status is "pending" or unknown
            if time.monotonic() + self._poll_interval > deadline:
                return VerifyResult(
                    passed=False,
                    action=self._action_on_fail,
                    message=(
                        f"CI still pending on branch '{branch}' "
                        f"after {self._timeout}s timeout."
                    ),
                    details={
                        "branch": branch,
                        "ci_status": "timeout",
                        "timeout": self._timeout,
                    },
                )

            logger.debug(
                "CI pending on '%s', polling again in %ds",
                branch,
                self._poll_interval,
            )
            time.sleep(self._poll_interval)

    @staticmethod
    def _current_branch(project_root: Path) -> str | None:
        """Get the current git branch name."""
        try:
            proc = subprocess.run(
                ["git", "rev-parse", "--abbrev-ref", "HEAD"],
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=10,
            )
            if proc.returncode == 0:
                return proc.stdout.strip()
        except (subprocess.TimeoutExpired, FileNotFoundError):
            pass
        return None

    @staticmethod
    def _check_status(project_root: Path, branch: str) -> str:
        """Query CI status via ``gh pr checks`` or ``gh run list``."""
        try:
            proc = subprocess.run(
                [
                    "gh",
                    "run",
                    "list",
                    "--branch",
                    branch,
                    "--limit",
                    "1",
                    "--json",
                    "status,conclusion",
                ],
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=30,
            )
            if proc.returncode != 0:
                return "pending"

            import json

            runs = json.loads(proc.stdout)
            if not runs:
                return "pending"

            run = runs[0]
            if run.get("status") != "completed":
                return "pending"

            conclusion = run.get("conclusion", "")
            if conclusion == "success":
                return "success"
            return "failure"

        except (subprocess.TimeoutExpired, FileNotFoundError):
            return "pending"

    @staticmethod
    def _get_failure_logs(project_root: Path, branch: str) -> str:
        """Fetch CI failure logs for agent consumption."""
        try:
            proc = subprocess.run(
                [
                    "gh",
                    "run",
                    "list",
                    "--branch",
                    branch,
                    "--limit",
                    "1",
                    "--json",
                    "databaseId",
                ],
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=30,
            )
            if proc.returncode != 0:
                return "Could not fetch CI logs."

            import json

            runs = json.loads(proc.stdout)
            if not runs:
                return "No CI runs found."

            run_id = runs[0].get("databaseId", "")
            if not run_id:
                return "Could not determine run ID."

            log_proc = subprocess.run(
                ["gh", "run", "view", str(run_id), "--log-failed"],
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=30,
            )
            if log_proc.returncode == 0 and log_proc.stdout.strip():
                return log_proc.stdout.strip()

            return "CI run failed. Use `gh run view` for details."

        except (subprocess.TimeoutExpired, FileNotFoundError):
            return "Could not fetch CI logs."

verify(project_root, result)

Poll CI status until terminal state or timeout.

Source code in src/rigger/verifiers/ci_status.py
def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
    """Poll CI status until terminal state or timeout."""
    branch = self._branch or self._current_branch(project_root)
    if not branch:
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message="Could not determine current branch.",
        )

    deadline = time.monotonic() + self._timeout

    while True:
        status = self._check_status(project_root, branch)

        if status == "success":
            return VerifyResult(
                passed=True,
                message=f"All CI checks passed on branch '{branch}'.",
                details={"branch": branch, "ci_status": "success"},
            )

        if status == "failure":
            logs = self._get_failure_logs(project_root, branch)
            return VerifyResult(
                passed=False,
                action=self._action_on_fail,
                message=f"CI failed on branch '{branch}'.\n{logs}",
                details={
                    "branch": branch,
                    "ci_status": "failure",
                    "logs": logs,
                },
            )

        # status is "pending" or unknown
        if time.monotonic() + self._poll_interval > deadline:
            return VerifyResult(
                passed=False,
                action=self._action_on_fail,
                message=(
                    f"CI still pending on branch '{branch}' "
                    f"after {self._timeout}s timeout."
                ),
                details={
                    "branch": branch,
                    "ci_status": "timeout",
                    "timeout": self._timeout,
                },
            )

        logger.debug(
            "CI pending on '%s', polling again in %ds",
            branch,
            self._poll_interval,
        )
        time.sleep(self._poll_interval)

LintVerifier

Runs a linter command; zero errors means ACCEPT, errors means RETRY.

Captures stdout/stderr and reports error count in VerifyResult.details.

Parameters:

Name Type Description Default
command list[str]

Linter command to run (as a list of args).

required
timeout int

Maximum seconds to wait (default 120).

120
action_on_fail str

VerifyAction when linting fails (default RETRY).

'retry'
Source code in src/rigger/verifiers/lint.py
class LintVerifier:
    """Runs a linter command; zero errors means ACCEPT, errors means RETRY.

    Captures stdout/stderr and reports error count in ``VerifyResult.details``.

    Args:
        command: Linter command to run (as a list of args).
        timeout: Maximum seconds to wait (default 120).
        action_on_fail: VerifyAction when linting fails (default RETRY).
    """

    def __init__(
        self,
        command: list[str],
        timeout: int = 120,
        action_on_fail: str = "retry",
    ) -> None:
        self._command = command
        self._timeout = timeout
        self._action_on_fail = VerifyAction(action_on_fail)

    def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
        """Run the linter and return a VerifyResult based on exit code."""
        try:
            proc = subprocess.run(
                self._command,
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=self._timeout,
            )
        except subprocess.TimeoutExpired as exc:
            return VerifyResult(
                passed=False,
                action=self._action_on_fail,
                message=f"Linter timed out after {self._timeout}s",
                details={
                    "stdout": (exc.stdout or b"").decode(errors="replace")
                    if isinstance(exc.stdout, bytes)
                    else (exc.stdout or ""),
                    "stderr": (exc.stderr or b"").decode(errors="replace")
                    if isinstance(exc.stderr, bytes)
                    else (exc.stderr or ""),
                    "timeout": self._timeout,
                },
            )
        except FileNotFoundError:
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message=f"Linter not found: {self._command[0]}",
                details={"command": self._command},
            )

        if proc.returncode == 0:
            return VerifyResult(
                passed=True,
                message="Lint check passed.",
                details={
                    "stdout": proc.stdout,
                    "stderr": proc.stderr,
                    "returncode": proc.returncode,
                },
            )

        output = (proc.stdout + "\n" + proc.stderr).strip()
        error_count = self._count_errors(output)

        return VerifyResult(
            passed=False,
            action=self._action_on_fail,
            message=self._format_failure(output, error_count),
            details={
                "stdout": proc.stdout,
                "stderr": proc.stderr,
                "returncode": proc.returncode,
                "error_count": error_count,
            },
        )

    @staticmethod
    def _count_errors(output: str) -> int:
        """Count error lines in linter output.

        Heuristic: each non-empty line in the output is one error/warning.
        """
        return sum(1 for line in output.splitlines() if line.strip())

    @staticmethod
    def _format_failure(output: str, error_count: int) -> str:
        """Format lint violations for agent consumption."""
        parts = [f"Lint check failed ({error_count} issue(s)):"]
        if output:
            parts.append(output)
        parts.append("\nFix the lint issues above and the harness will re-verify.")
        return "\n".join(parts)

verify(project_root, result)

Run the linter and return a VerifyResult based on exit code.

Source code in src/rigger/verifiers/lint.py
def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
    """Run the linter and return a VerifyResult based on exit code."""
    try:
        proc = subprocess.run(
            self._command,
            cwd=project_root,
            capture_output=True,
            text=True,
            timeout=self._timeout,
        )
    except subprocess.TimeoutExpired as exc:
        return VerifyResult(
            passed=False,
            action=self._action_on_fail,
            message=f"Linter timed out after {self._timeout}s",
            details={
                "stdout": (exc.stdout or b"").decode(errors="replace")
                if isinstance(exc.stdout, bytes)
                else (exc.stdout or ""),
                "stderr": (exc.stderr or b"").decode(errors="replace")
                if isinstance(exc.stderr, bytes)
                else (exc.stderr or ""),
                "timeout": self._timeout,
            },
        )
    except FileNotFoundError:
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message=f"Linter not found: {self._command[0]}",
            details={"command": self._command},
        )

    if proc.returncode == 0:
        return VerifyResult(
            passed=True,
            message="Lint check passed.",
            details={
                "stdout": proc.stdout,
                "stderr": proc.stderr,
                "returncode": proc.returncode,
            },
        )

    output = (proc.stdout + "\n" + proc.stderr).strip()
    error_count = self._count_errors(output)

    return VerifyResult(
        passed=False,
        action=self._action_on_fail,
        message=self._format_failure(output, error_count),
        details={
            "stdout": proc.stdout,
            "stderr": proc.stderr,
            "returncode": proc.returncode,
            "error_count": error_count,
        },
    )

RatchetVerifier

Runs a pipeline of commands in sequence; ACCEPT only when all pass.

Each step is a {command, name} pair. Stops at the first failure and returns RETRY with the failure output. ACCEPT only when every step succeeds.

Parameters:

Name Type Description Default
steps list[dict[str, Any]]

List of step dicts, each with command (list[str]) and name (str) keys.

required
timeout int

Maximum seconds per step (default 600).

600
Source code in src/rigger/verifiers/ratchet.py
class RatchetVerifier:
    """Runs a pipeline of commands in sequence; ACCEPT only when all pass.

    Each step is a ``{command, name}`` pair. Stops at the first failure
    and returns RETRY with the failure output. ACCEPT only when every
    step succeeds.

    Args:
        steps: List of step dicts, each with ``command`` (list[str]) and
            ``name`` (str) keys.
        timeout: Maximum seconds per step (default 600).
    """

    def __init__(
        self,
        steps: list[dict[str, Any]],
        timeout: int = 600,
    ) -> None:
        self._steps = steps
        self._timeout = timeout

    def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
        """Run each step in sequence; RETRY on first failure, ACCEPT on all-green."""
        step_results: dict[str, bool] = {}
        failure_messages: list[str] = []

        for step in self._steps:
            name = step.get("name", "unnamed")
            command = step.get("command", [])
            if not command:
                logger.warning("Skipping step '%s' with empty command", name)
                continue

            passed, output = self._run_step(project_root, command, name)
            step_results[name] = passed

            if not passed:
                failure_messages.append(f"{name.upper()} FAILED:\n{output}")

        all_passed = bool(step_results) and all(step_results.values())

        return VerifyResult(
            passed=all_passed,
            action=VerifyAction.ACCEPT if all_passed else VerifyAction.RETRY,
            message=(
                "All checks passed." if all_passed else "\n\n".join(failure_messages)
            ),
            details={
                "step_results": step_results,
            },
        )

    def _run_step(
        self,
        project_root: Path,
        command: list[str],
        name: str,
    ) -> tuple[bool, str]:
        """Run a single pipeline step; return (passed, output)."""
        try:
            proc = subprocess.run(
                command,
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=self._timeout,
            )
        except subprocess.TimeoutExpired:
            return False, f"{name} timed out after {self._timeout}s"
        except FileNotFoundError:
            return False, f"Command not found: {command[0]}"

        if proc.returncode == 0:
            return True, ""

        return False, (proc.stdout + "\n" + proc.stderr).strip()

verify(project_root, result)

Run each step in sequence; RETRY on first failure, ACCEPT on all-green.

Source code in src/rigger/verifiers/ratchet.py
def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
    """Run each step in sequence; RETRY on first failure, ACCEPT on all-green."""
    step_results: dict[str, bool] = {}
    failure_messages: list[str] = []

    for step in self._steps:
        name = step.get("name", "unnamed")
        command = step.get("command", [])
        if not command:
            logger.warning("Skipping step '%s' with empty command", name)
            continue

        passed, output = self._run_step(project_root, command, name)
        step_results[name] = passed

        if not passed:
            failure_messages.append(f"{name.upper()} FAILED:\n{output}")

    all_passed = bool(step_results) and all(step_results.values())

    return VerifyResult(
        passed=all_passed,
        action=VerifyAction.ACCEPT if all_passed else VerifyAction.RETRY,
        message=(
            "All checks passed." if all_passed else "\n\n".join(failure_messages)
        ),
        details={
            "step_results": step_results,
        },
    )

TestSuiteVerifier

Runs a shell command (pytest, npm test, etc.); pass/fail from exit code.

Exit code 0 maps to ACCEPT. Non-zero maps to action_on_fail (default RETRY). Captures stdout/stderr in VerifyResult.details.

Parameters:

Name Type Description Default
command list[str]

Shell command to run (as a list of args).

required
timeout int

Maximum seconds to wait for the command (default 300).

300
action_on_fail str

VerifyAction when the command fails (default RETRY).

'retry'
Source code in src/rigger/verifiers/test_suite.py
class TestSuiteVerifier:
    """Runs a shell command (pytest, npm test, etc.); pass/fail from exit code.

    Exit code 0 maps to ACCEPT. Non-zero maps to ``action_on_fail``
    (default RETRY). Captures stdout/stderr in ``VerifyResult.details``.

    Args:
        command: Shell command to run (as a list of args).
        timeout: Maximum seconds to wait for the command (default 300).
        action_on_fail: VerifyAction when the command fails (default RETRY).
    """

    def __init__(
        self,
        command: list[str],
        timeout: int = 300,
        action_on_fail: str = "retry",
    ) -> None:
        self._command = command
        self._timeout = timeout
        self._action_on_fail = VerifyAction(action_on_fail)

    def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
        """Run the test command and return a VerifyResult based on exit code."""
        try:
            proc = subprocess.run(
                self._command,
                cwd=project_root,
                capture_output=True,
                text=True,
                timeout=self._timeout,
            )
        except subprocess.TimeoutExpired as exc:
            return VerifyResult(
                passed=False,
                action=self._action_on_fail,
                message=f"Command timed out after {self._timeout}s",
                details={
                    "stdout": (exc.stdout or b"").decode(errors="replace")
                    if isinstance(exc.stdout, bytes)
                    else (exc.stdout or ""),
                    "stderr": (exc.stderr or b"").decode(errors="replace")
                    if isinstance(exc.stderr, bytes)
                    else (exc.stderr or ""),
                    "timeout": self._timeout,
                },
            )
        except FileNotFoundError:
            return VerifyResult(
                passed=False,
                action=VerifyAction.BLOCK,
                message=f"Command not found: {self._command[0]}",
                details={"command": self._command},
            )

        if proc.returncode == 0:
            return VerifyResult(
                passed=True,
                message="All tests passed.",
                details={
                    "stdout": proc.stdout,
                    "stderr": proc.stderr,
                    "returncode": proc.returncode,
                },
            )

        return VerifyResult(
            passed=False,
            action=self._action_on_fail,
            message=self._format_failure(proc),
            details={
                "stdout": proc.stdout,
                "stderr": proc.stderr,
                "returncode": proc.returncode,
            },
        )

    @staticmethod
    def _format_failure(proc: subprocess.CompletedProcess[str]) -> str:
        """Format failure output for agent consumption."""
        parts: list[str] = [f"Tests failed (exit code {proc.returncode})."]
        output = (proc.stdout + "\n" + proc.stderr).strip()
        if output:
            parts.append(output)
        return "\n".join(parts)

verify(project_root, result)

Run the test command and return a VerifyResult based on exit code.

Source code in src/rigger/verifiers/test_suite.py
def verify(self, project_root: Path, result: TaskResult) -> VerifyResult:
    """Run the test command and return a VerifyResult based on exit code."""
    try:
        proc = subprocess.run(
            self._command,
            cwd=project_root,
            capture_output=True,
            text=True,
            timeout=self._timeout,
        )
    except subprocess.TimeoutExpired as exc:
        return VerifyResult(
            passed=False,
            action=self._action_on_fail,
            message=f"Command timed out after {self._timeout}s",
            details={
                "stdout": (exc.stdout or b"").decode(errors="replace")
                if isinstance(exc.stdout, bytes)
                else (exc.stdout or ""),
                "stderr": (exc.stderr or b"").decode(errors="replace")
                if isinstance(exc.stderr, bytes)
                else (exc.stderr or ""),
                "timeout": self._timeout,
            },
        )
    except FileNotFoundError:
        return VerifyResult(
            passed=False,
            action=VerifyAction.BLOCK,
            message=f"Command not found: {self._command[0]}",
            details={"command": self._command},
        )

    if proc.returncode == 0:
        return VerifyResult(
            passed=True,
            message="All tests passed.",
            details={
                "stdout": proc.stdout,
                "stderr": proc.stderr,
                "returncode": proc.returncode,
            },
        )

    return VerifyResult(
        passed=False,
        action=self._action_on_fail,
        message=self._format_failure(proc),
        details={
            "stdout": proc.stdout,
            "stderr": proc.stderr,
            "returncode": proc.returncode,
        },
    )

GitWorktreeManager

Workspace isolation via git worktrees with sequential merge.

Each agent works on its own branch in an isolated worktree. On completion, the branch is merged back into the main root via git merge --no-ff.

WARNING: Sequential merge creates PRIORITY-COUPLING. Tasks merged first always succeed; later tasks bear merge conflict risk.

Worktrees are created under <parent>/.rigger-worktrees/<branch_name>. Minimum git version: 2.17 (for git worktree remove).

Source code in src/rigger/workspace/git_worktree.py
class GitWorktreeManager:
    """Workspace isolation via git worktrees with sequential merge.

    Each agent works on its own branch in an isolated worktree.
    On completion, the branch is merged back into the main root
    via ``git merge --no-ff``.

    WARNING: Sequential merge creates PRIORITY-COUPLING. Tasks merged
    first always succeed; later tasks bear merge conflict risk.

    Worktrees are created under ``<parent>/.rigger-worktrees/<branch_name>``.
    Minimum git version: 2.17 (for ``git worktree remove``).
    """

    def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
        """Create a git worktree with a new branch.

        Args:
            main_root: The main project root (must be a git repo with
                at least one commit).
            task: The task to execute (used for logging context).
            branch_name: Branch name for the new worktree.

        Returns:
            Path to the created worktree directory.

        Raises:
            RuntimeError: If the repo has no commits.
            subprocess.CalledProcessError: If worktree creation fails.
        """
        resolved_root = main_root.resolve()

        # E8: Verify repo has at least one commit.
        result = subprocess.run(
            ["git", "rev-parse", "HEAD"],
            cwd=resolved_root,
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode != 0:
            msg = f"Cannot create worktree: {resolved_root} has no commits."
            raise RuntimeError(msg)

        worktree_dir = resolved_root.parent / ".rigger-worktrees" / branch_name

        # X8: Ensure parent directories exist for slash-containing branch names.
        worktree_dir.parent.mkdir(parents=True, exist_ok=True)

        # E6: Clean up stale worktree/branch if exists from a previous crash.
        if worktree_dir.exists():
            subprocess.run(
                ["git", "worktree", "remove", str(worktree_dir), "--force"],
                cwd=resolved_root,
                check=False,
            )
        subprocess.run(
            ["git", "branch", "-D", branch_name],
            cwd=resolved_root,
            check=False,
        )

        # Create worktree with new branch (strict -b, not -B).
        subprocess.run(
            ["git", "worktree", "add", str(worktree_dir), "-b", branch_name],
            cwd=resolved_root,
            check=True,
        )

        # E2: Initialize submodules if present.
        if (worktree_dir / ".gitmodules").exists():
            subprocess.run(
                ["git", "submodule", "update", "--init", "--recursive"],
                cwd=worktree_dir,
                check=True,
            )

        logger.debug(
            "Created worktree for task %s at %s (branch: %s)",
            task.id,
            worktree_dir,
            branch_name,
        )
        return worktree_dir

    def merge(self, worktree: Path, main_root: Path) -> MergeResult:
        """Merge worktree branch into main_root via ``git merge --no-ff``.

        On conflict, aborts the merge and returns conflict details.

        Args:
            worktree: Path to the worktree to merge from.
            main_root: The main project root to merge into.

        Returns:
            MergeResult with success status and any conflict information.
        """
        branch = _get_branch_name(worktree)
        try:
            subprocess.run(
                ["git", "merge", branch, "--no-ff"],
                cwd=main_root,
                check=True,
                capture_output=True,
                text=True,
            )
            return MergeResult(success=True, merged_commits=[branch])
        except subprocess.CalledProcessError:
            conflicts = _get_conflicting_files(main_root)
            subprocess.run(
                ["git", "merge", "--abort"],
                cwd=main_root,
                check=False,
            )
            return MergeResult(
                success=False,
                conflicts=conflicts,
                worktree_path=worktree,
            )

    def cleanup(self, worktree: Path) -> None:
        """Remove worktree and its branch. Idempotent.

        Args:
            worktree: Path to the worktree to remove.
        """
        if not worktree.exists():
            return

        branch = _get_branch_name(worktree)

        # X3: Discover main repo root reliably.
        main_root = _get_main_root(worktree)

        # E11: Unlock if locked (no-op if not locked).
        subprocess.run(
            ["git", "worktree", "unlock", str(worktree)],
            cwd=main_root,
            check=False,
        )
        subprocess.run(
            ["git", "worktree", "remove", str(worktree), "--force"],
            cwd=main_root,
            check=False,
        )
        subprocess.run(
            ["git", "branch", "-D", branch],
            cwd=main_root,
            check=False,
        )

create(main_root, task, branch_name)

Create a git worktree with a new branch.

Parameters:

Name Type Description Default
main_root Path

The main project root (must be a git repo with at least one commit).

required
task Task

The task to execute (used for logging context).

required
branch_name str

Branch name for the new worktree.

required

Returns:

Type Description
Path

Path to the created worktree directory.

Raises:

Type Description
RuntimeError

If the repo has no commits.

CalledProcessError

If worktree creation fails.

Source code in src/rigger/workspace/git_worktree.py
def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
    """Create a git worktree with a new branch.

    Args:
        main_root: The main project root (must be a git repo with
            at least one commit).
        task: The task to execute (used for logging context).
        branch_name: Branch name for the new worktree.

    Returns:
        Path to the created worktree directory.

    Raises:
        RuntimeError: If the repo has no commits.
        subprocess.CalledProcessError: If worktree creation fails.
    """
    resolved_root = main_root.resolve()

    # E8: Verify repo has at least one commit.
    result = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=resolved_root,
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        msg = f"Cannot create worktree: {resolved_root} has no commits."
        raise RuntimeError(msg)

    worktree_dir = resolved_root.parent / ".rigger-worktrees" / branch_name

    # X8: Ensure parent directories exist for slash-containing branch names.
    worktree_dir.parent.mkdir(parents=True, exist_ok=True)

    # E6: Clean up stale worktree/branch if exists from a previous crash.
    if worktree_dir.exists():
        subprocess.run(
            ["git", "worktree", "remove", str(worktree_dir), "--force"],
            cwd=resolved_root,
            check=False,
        )
    subprocess.run(
        ["git", "branch", "-D", branch_name],
        cwd=resolved_root,
        check=False,
    )

    # Create worktree with new branch (strict -b, not -B).
    subprocess.run(
        ["git", "worktree", "add", str(worktree_dir), "-b", branch_name],
        cwd=resolved_root,
        check=True,
    )

    # E2: Initialize submodules if present.
    if (worktree_dir / ".gitmodules").exists():
        subprocess.run(
            ["git", "submodule", "update", "--init", "--recursive"],
            cwd=worktree_dir,
            check=True,
        )

    logger.debug(
        "Created worktree for task %s at %s (branch: %s)",
        task.id,
        worktree_dir,
        branch_name,
    )
    return worktree_dir

merge(worktree, main_root)

Merge worktree branch into main_root via git merge --no-ff.

On conflict, aborts the merge and returns conflict details.

Parameters:

Name Type Description Default
worktree Path

Path to the worktree to merge from.

required
main_root Path

The main project root to merge into.

required

Returns:

Type Description
MergeResult

MergeResult with success status and any conflict information.

Source code in src/rigger/workspace/git_worktree.py
def merge(self, worktree: Path, main_root: Path) -> MergeResult:
    """Merge worktree branch into main_root via ``git merge --no-ff``.

    On conflict, aborts the merge and returns conflict details.

    Args:
        worktree: Path to the worktree to merge from.
        main_root: The main project root to merge into.

    Returns:
        MergeResult with success status and any conflict information.
    """
    branch = _get_branch_name(worktree)
    try:
        subprocess.run(
            ["git", "merge", branch, "--no-ff"],
            cwd=main_root,
            check=True,
            capture_output=True,
            text=True,
        )
        return MergeResult(success=True, merged_commits=[branch])
    except subprocess.CalledProcessError:
        conflicts = _get_conflicting_files(main_root)
        subprocess.run(
            ["git", "merge", "--abort"],
            cwd=main_root,
            check=False,
        )
        return MergeResult(
            success=False,
            conflicts=conflicts,
            worktree_path=worktree,
        )

cleanup(worktree)

Remove worktree and its branch. Idempotent.

Parameters:

Name Type Description Default
worktree Path

Path to the worktree to remove.

required
Source code in src/rigger/workspace/git_worktree.py
def cleanup(self, worktree: Path) -> None:
    """Remove worktree and its branch. Idempotent.

    Args:
        worktree: Path to the worktree to remove.
    """
    if not worktree.exists():
        return

    branch = _get_branch_name(worktree)

    # X3: Discover main repo root reliably.
    main_root = _get_main_root(worktree)

    # E11: Unlock if locked (no-op if not locked).
    subprocess.run(
        ["git", "worktree", "unlock", str(worktree)],
        cwd=main_root,
        check=False,
    )
    subprocess.run(
        ["git", "worktree", "remove", str(worktree), "--force"],
        cwd=main_root,
        check=False,
    )
    subprocess.run(
        ["git", "branch", "-D", branch],
        cwd=main_root,
        check=False,
    )

IndependentBranchManager

Default WorkspaceManager: branch-per-task, push as PR.

Each agent works on its own branch in an isolated worktree. On completion, the branch is pushed to the remote. No local merge into main_root. PRs are merged by human review or CI.

Matches: C2 (OpenAI), C6 (Spotify), C14 (Composio), C15 (Open SWE). This is the corpus-dominant pattern (4/6 parallel sources).

Worktrees are created under <parent>/.rigger-worktrees/<branch_name>. Minimum git version: 2.17 (for git worktree remove).

Source code in src/rigger/workspace/independent_branch.py
class IndependentBranchManager:
    """Default WorkspaceManager: branch-per-task, push as PR.

    Each agent works on its own branch in an isolated worktree.
    On completion, the branch is pushed to the remote. No local merge
    into main_root. PRs are merged by human review or CI.

    Matches: C2 (OpenAI), C6 (Spotify), C14 (Composio), C15 (Open SWE).
    This is the corpus-dominant pattern (4/6 parallel sources).

    Worktrees are created under ``<parent>/.rigger-worktrees/<branch_name>``.
    Minimum git version: 2.17 (for ``git worktree remove``).
    """

    def __init__(
        self,
        *,
        remote: str = "origin",
        create_pr: bool = True,
        cleanup_branch: bool = False,
    ) -> None:
        """Initialize the manager.

        Args:
            remote: Git remote name for pushing branches.
            create_pr: Whether to create a GitHub PR after pushing via
                ``gh pr create``. Non-fatal if ``gh`` is unavailable.
            cleanup_branch: Whether to delete the local branch on cleanup.
                Default False since branches are the deliverable (PRs).
        """
        self._remote = remote
        self._create_pr = create_pr
        self._cleanup_branch = cleanup_branch

    def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
        """Create a git worktree with a new branch.

        Args:
            main_root: The main project root (must be a git repo with
                at least one commit).
            task: The task to execute (used for logging context).
            branch_name: Branch name for the new worktree.

        Returns:
            Path to the created worktree directory.

        Raises:
            RuntimeError: If the repo has no commits.
            subprocess.CalledProcessError: If worktree creation fails.
        """
        resolved_root = main_root.resolve()

        # E8: Verify repo has at least one commit.
        result = subprocess.run(
            ["git", "rev-parse", "HEAD"],
            cwd=resolved_root,
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode != 0:
            msg = f"Cannot create worktree: {resolved_root} has no commits."
            raise RuntimeError(msg)

        worktree_dir = resolved_root.parent / ".rigger-worktrees" / branch_name

        # X8: Ensure parent directories exist for slash-containing branch names.
        worktree_dir.parent.mkdir(parents=True, exist_ok=True)

        # E6: Clean up stale worktree/branch if exists from a previous crash.
        if worktree_dir.exists():
            subprocess.run(
                ["git", "worktree", "remove", str(worktree_dir), "--force"],
                cwd=resolved_root,
                check=False,
            )
        subprocess.run(
            ["git", "branch", "-D", branch_name],
            cwd=resolved_root,
            check=False,
        )

        # Create worktree with new branch (strict -b, not -B).
        subprocess.run(
            ["git", "worktree", "add", str(worktree_dir), "-b", branch_name],
            cwd=resolved_root,
            check=True,
        )

        # E2: Initialize submodules if present.
        if (worktree_dir / ".gitmodules").exists():
            subprocess.run(
                ["git", "submodule", "update", "--init", "--recursive"],
                cwd=worktree_dir,
                check=True,
            )

        logger.debug(
            "Created worktree for task %s at %s (branch: %s)",
            task.id,
            worktree_dir,
            branch_name,
        )
        return worktree_dir

    def merge(self, worktree: Path, main_root: Path) -> MergeResult:
        """Push branch to remote. No local merge.

        Pushes the worktree's branch to the configured remote. Optionally
        creates a GitHub PR via ``gh pr create``. PR creation failure is
        non-fatal since the branch is already pushed.

        Args:
            worktree: Path to the worktree to push from.
            main_root: The main project root (unused — no local merge).

        Returns:
            MergeResult with success status and diagnostic metadata.
        """
        branch = _get_branch_name(worktree)
        try:
            subprocess.run(
                ["git", "push", self._remote, branch],
                cwd=worktree,
                check=True,
                capture_output=True,
                text=True,
            )
        except subprocess.CalledProcessError as e:
            return MergeResult(
                success=False,
                worktree_path=worktree,
                metadata={"stderr": e.stderr, "returncode": e.returncode},
            )

        metadata: dict[str, Any] = {}
        if self._create_pr:
            try:
                pr_result = subprocess.run(
                    ["gh", "pr", "create", "--head", branch, "--fill"],
                    cwd=worktree,
                    check=True,
                    capture_output=True,
                    text=True,
                )
                metadata["pr_url"] = pr_result.stdout.strip()
            except FileNotFoundError:
                logger.warning("gh CLI not found; skipping PR creation for %s", branch)
            except subprocess.CalledProcessError as e:
                logger.warning(
                    "PR creation failed for %s: %s", branch, e.stderr.strip()
                )

        return MergeResult(success=True, metadata=metadata)

    def cleanup(self, worktree: Path) -> None:
        """Remove worktree and optionally its branch. Idempotent.

        By default, preserves the branch since it's the deliverable
        (pushed as a PR). Set ``cleanup_branch=True`` to also delete it.

        Args:
            worktree: Path to the worktree to remove.
        """
        if not worktree.exists():
            return

        branch = _get_branch_name(worktree)

        # X3: Discover main repo root reliably.
        main_root = _get_main_root(worktree)

        # E11: Unlock if locked (no-op if not locked).
        subprocess.run(
            ["git", "worktree", "unlock", str(worktree)],
            cwd=main_root,
            check=False,
        )
        subprocess.run(
            ["git", "worktree", "remove", str(worktree), "--force"],
            cwd=main_root,
            check=False,
        )

        if self._cleanup_branch:
            subprocess.run(
                ["git", "branch", "-D", branch],
                cwd=main_root,
                check=False,
            )

__init__(*, remote='origin', create_pr=True, cleanup_branch=False)

Initialize the manager.

Parameters:

Name Type Description Default
remote str

Git remote name for pushing branches.

'origin'
create_pr bool

Whether to create a GitHub PR after pushing via gh pr create. Non-fatal if gh is unavailable.

True
cleanup_branch bool

Whether to delete the local branch on cleanup. Default False since branches are the deliverable (PRs).

False
Source code in src/rigger/workspace/independent_branch.py
def __init__(
    self,
    *,
    remote: str = "origin",
    create_pr: bool = True,
    cleanup_branch: bool = False,
) -> None:
    """Initialize the manager.

    Args:
        remote: Git remote name for pushing branches.
        create_pr: Whether to create a GitHub PR after pushing via
            ``gh pr create``. Non-fatal if ``gh`` is unavailable.
        cleanup_branch: Whether to delete the local branch on cleanup.
            Default False since branches are the deliverable (PRs).
    """
    self._remote = remote
    self._create_pr = create_pr
    self._cleanup_branch = cleanup_branch

create(main_root, task, branch_name)

Create a git worktree with a new branch.

Parameters:

Name Type Description Default
main_root Path

The main project root (must be a git repo with at least one commit).

required
task Task

The task to execute (used for logging context).

required
branch_name str

Branch name for the new worktree.

required

Returns:

Type Description
Path

Path to the created worktree directory.

Raises:

Type Description
RuntimeError

If the repo has no commits.

CalledProcessError

If worktree creation fails.

Source code in src/rigger/workspace/independent_branch.py
def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
    """Create a git worktree with a new branch.

    Args:
        main_root: The main project root (must be a git repo with
            at least one commit).
        task: The task to execute (used for logging context).
        branch_name: Branch name for the new worktree.

    Returns:
        Path to the created worktree directory.

    Raises:
        RuntimeError: If the repo has no commits.
        subprocess.CalledProcessError: If worktree creation fails.
    """
    resolved_root = main_root.resolve()

    # E8: Verify repo has at least one commit.
    result = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=resolved_root,
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        msg = f"Cannot create worktree: {resolved_root} has no commits."
        raise RuntimeError(msg)

    worktree_dir = resolved_root.parent / ".rigger-worktrees" / branch_name

    # X8: Ensure parent directories exist for slash-containing branch names.
    worktree_dir.parent.mkdir(parents=True, exist_ok=True)

    # E6: Clean up stale worktree/branch if exists from a previous crash.
    if worktree_dir.exists():
        subprocess.run(
            ["git", "worktree", "remove", str(worktree_dir), "--force"],
            cwd=resolved_root,
            check=False,
        )
    subprocess.run(
        ["git", "branch", "-D", branch_name],
        cwd=resolved_root,
        check=False,
    )

    # Create worktree with new branch (strict -b, not -B).
    subprocess.run(
        ["git", "worktree", "add", str(worktree_dir), "-b", branch_name],
        cwd=resolved_root,
        check=True,
    )

    # E2: Initialize submodules if present.
    if (worktree_dir / ".gitmodules").exists():
        subprocess.run(
            ["git", "submodule", "update", "--init", "--recursive"],
            cwd=worktree_dir,
            check=True,
        )

    logger.debug(
        "Created worktree for task %s at %s (branch: %s)",
        task.id,
        worktree_dir,
        branch_name,
    )
    return worktree_dir

merge(worktree, main_root)

Push branch to remote. No local merge.

Pushes the worktree's branch to the configured remote. Optionally creates a GitHub PR via gh pr create. PR creation failure is non-fatal since the branch is already pushed.

Parameters:

Name Type Description Default
worktree Path

Path to the worktree to push from.

required
main_root Path

The main project root (unused — no local merge).

required

Returns:

Type Description
MergeResult

MergeResult with success status and diagnostic metadata.

Source code in src/rigger/workspace/independent_branch.py
def merge(self, worktree: Path, main_root: Path) -> MergeResult:
    """Push branch to remote. No local merge.

    Pushes the worktree's branch to the configured remote. Optionally
    creates a GitHub PR via ``gh pr create``. PR creation failure is
    non-fatal since the branch is already pushed.

    Args:
        worktree: Path to the worktree to push from.
        main_root: The main project root (unused — no local merge).

    Returns:
        MergeResult with success status and diagnostic metadata.
    """
    branch = _get_branch_name(worktree)
    try:
        subprocess.run(
            ["git", "push", self._remote, branch],
            cwd=worktree,
            check=True,
            capture_output=True,
            text=True,
        )
    except subprocess.CalledProcessError as e:
        return MergeResult(
            success=False,
            worktree_path=worktree,
            metadata={"stderr": e.stderr, "returncode": e.returncode},
        )

    metadata: dict[str, Any] = {}
    if self._create_pr:
        try:
            pr_result = subprocess.run(
                ["gh", "pr", "create", "--head", branch, "--fill"],
                cwd=worktree,
                check=True,
                capture_output=True,
                text=True,
            )
            metadata["pr_url"] = pr_result.stdout.strip()
        except FileNotFoundError:
            logger.warning("gh CLI not found; skipping PR creation for %s", branch)
        except subprocess.CalledProcessError as e:
            logger.warning(
                "PR creation failed for %s: %s", branch, e.stderr.strip()
            )

    return MergeResult(success=True, metadata=metadata)

cleanup(worktree)

Remove worktree and optionally its branch. Idempotent.

By default, preserves the branch since it's the deliverable (pushed as a PR). Set cleanup_branch=True to also delete it.

Parameters:

Name Type Description Default
worktree Path

Path to the worktree to remove.

required
Source code in src/rigger/workspace/independent_branch.py
def cleanup(self, worktree: Path) -> None:
    """Remove worktree and optionally its branch. Idempotent.

    By default, preserves the branch since it's the deliverable
    (pushed as a PR). Set ``cleanup_branch=True`` to also delete it.

    Args:
        worktree: Path to the worktree to remove.
    """
    if not worktree.exists():
        return

    branch = _get_branch_name(worktree)

    # X3: Discover main repo root reliably.
    main_root = _get_main_root(worktree)

    # E11: Unlock if locked (no-op if not locked).
    subprocess.run(
        ["git", "worktree", "unlock", str(worktree)],
        cwd=main_root,
        check=False,
    )
    subprocess.run(
        ["git", "worktree", "remove", str(worktree), "--force"],
        cwd=main_root,
        check=False,
    )

    if self._cleanup_branch:
        subprocess.run(
            ["git", "branch", "-D", branch],
            cwd=main_root,
            check=False,
        )

IndependentDirManager

Workspace isolation via directory copies.

Each agent works on a full copy of the project in a temp directory. On completion, the copy is merged back by overwriting the main root. No git operations — works with any project.

Use this when git worktrees are unavailable or undesired (e.g., non-git projects, CI environments without git, or when branch management overhead is unwanted).

Source code in src/rigger/workspace/independent.py
class IndependentDirManager:
    """Workspace isolation via directory copies.

    Each agent works on a full copy of the project in a temp directory.
    On completion, the copy is merged back by overwriting the main root.
    No git operations — works with any project.

    Use this when git worktrees are unavailable or undesired (e.g.,
    non-git projects, CI environments without git, or when branch
    management overhead is unwanted).
    """

    def __init__(self, *, prefix: str = "rigger-") -> None:
        """Initialize the manager.

        Args:
            prefix: Prefix for temporary directory names.
        """
        self._prefix = prefix

    def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
        """Copy the project to a temporary directory.

        Args:
            main_root: The main project root to copy.
            task: The task to execute (used for logging context).
            branch_name: Ignored — present for protocol compatibility.

        Returns:
            Path to the temporary copy.
        """
        tmp_parent = Path(tempfile.mkdtemp(prefix=self._prefix))
        workspace = tmp_parent / main_root.name
        shutil.copytree(main_root, workspace, symlinks=True)

        logger.debug(
            "Created workspace copy for task %s at %s",
            task.id,
            workspace,
        )
        return workspace

    def merge(self, worktree: Path, main_root: Path) -> MergeResult:
        """Copy workspace contents back to the main root.

        Overwrites files in main_root with those from the workspace.
        Does not delete files in main_root that don't exist in the
        workspace.

        Args:
            worktree: Path to the workspace copy.
            main_root: The main project root to update.

        Returns:
            MergeResult with success=True on completion.
        """
        try:
            shutil.copytree(worktree, main_root, symlinks=True, dirs_exist_ok=True)
            return MergeResult(success=True, worktree_path=worktree)
        except OSError as exc:
            return MergeResult(
                success=False,
                worktree_path=worktree,
                metadata={"error": str(exc)},
            )

    def cleanup(self, worktree: Path) -> None:
        """Remove the temporary workspace directory. Idempotent.

        Args:
            worktree: Path to the workspace to remove.
        """
        # The workspace is a subdirectory of the temp parent.
        tmp_parent = worktree.parent
        if tmp_parent.exists():
            shutil.rmtree(tmp_parent, ignore_errors=True)

__init__(*, prefix='rigger-')

Initialize the manager.

Parameters:

Name Type Description Default
prefix str

Prefix for temporary directory names.

'rigger-'
Source code in src/rigger/workspace/independent.py
def __init__(self, *, prefix: str = "rigger-") -> None:
    """Initialize the manager.

    Args:
        prefix: Prefix for temporary directory names.
    """
    self._prefix = prefix

create(main_root, task, branch_name)

Copy the project to a temporary directory.

Parameters:

Name Type Description Default
main_root Path

The main project root to copy.

required
task Task

The task to execute (used for logging context).

required
branch_name str

Ignored — present for protocol compatibility.

required

Returns:

Type Description
Path

Path to the temporary copy.

Source code in src/rigger/workspace/independent.py
def create(self, main_root: Path, task: Task, branch_name: str) -> Path:
    """Copy the project to a temporary directory.

    Args:
        main_root: The main project root to copy.
        task: The task to execute (used for logging context).
        branch_name: Ignored — present for protocol compatibility.

    Returns:
        Path to the temporary copy.
    """
    tmp_parent = Path(tempfile.mkdtemp(prefix=self._prefix))
    workspace = tmp_parent / main_root.name
    shutil.copytree(main_root, workspace, symlinks=True)

    logger.debug(
        "Created workspace copy for task %s at %s",
        task.id,
        workspace,
    )
    return workspace

merge(worktree, main_root)

Copy workspace contents back to the main root.

Overwrites files in main_root with those from the workspace. Does not delete files in main_root that don't exist in the workspace.

Parameters:

Name Type Description Default
worktree Path

Path to the workspace copy.

required
main_root Path

The main project root to update.

required

Returns:

Type Description
MergeResult

MergeResult with success=True on completion.

Source code in src/rigger/workspace/independent.py
def merge(self, worktree: Path, main_root: Path) -> MergeResult:
    """Copy workspace contents back to the main root.

    Overwrites files in main_root with those from the workspace.
    Does not delete files in main_root that don't exist in the
    workspace.

    Args:
        worktree: Path to the workspace copy.
        main_root: The main project root to update.

    Returns:
        MergeResult with success=True on completion.
    """
    try:
        shutil.copytree(worktree, main_root, symlinks=True, dirs_exist_ok=True)
        return MergeResult(success=True, worktree_path=worktree)
    except OSError as exc:
        return MergeResult(
            success=False,
            worktree_path=worktree,
            metadata={"error": str(exc)},
        )

cleanup(worktree)

Remove the temporary workspace directory. Idempotent.

Parameters:

Name Type Description Default
worktree Path

Path to the workspace to remove.

required
Source code in src/rigger/workspace/independent.py
def cleanup(self, worktree: Path) -> None:
    """Remove the temporary workspace directory. Idempotent.

    Args:
        worktree: Path to the workspace to remove.
    """
    # The workspace is a subdirectory of the temp parent.
    tmp_parent = worktree.parent
    if tmp_parent.exists():
        shutil.rmtree(tmp_parent, ignore_errors=True)

build_harness(config, project_root=None)

Build a fully-wired Harness from a HarnessConfig.

Uses the global registry to instantiate all components by their type config name.

Parameters:

Name Type Description Default
config HarnessConfig

Parsed YAML configuration.

required
project_root Path | None

Project root directory. Defaults to config.config_dir.

None

Returns:

Type Description
Harness

A configured Harness instance ready to call run() on.

Raises:

Type Description
KeyError

If a component type is not found in the registry.

TypeError

If constructor parameters are invalid.

Source code in src/rigger/_config.py
def build_harness(
    config: HarnessConfig,
    project_root: Path | None = None,
) -> Harness:
    """Build a fully-wired ``Harness`` from a ``HarnessConfig``.

    Uses the global registry to instantiate all components by their
    ``type`` config name.

    Args:
        config: Parsed YAML configuration.
        project_root: Project root directory. Defaults to *config.config_dir*.

    Returns:
        A configured ``Harness`` instance ready to call ``run()`` on.

    Raises:
        KeyError: If a component type is not found in the registry.
        TypeError: If constructor parameters are invalid.
    """
    root = project_root or config.config_dir

    def _create(protocol: str, comp: ComponentConfig) -> Any:
        params = _resolve_paths(comp.params, config.config_dir)
        return registry.create(protocol, comp.type, **params)

    backend = _create("backend", config.backend)
    task_source = _create("task_source", config.task_source)
    context_sources = [_create("context_source", c) for c in config.context_sources]
    verifiers = [_create("verifier", v) for v in config.verifiers]
    constraints = [_create("constraint", c) for c in config.constraints]
    state_store = (
        _create("state_store", config.state_store) if config.state_store else None
    )
    entropy_detectors = [
        _create("entropy_detector", e) for e in config.entropy_detectors
    ]
    workspace = (
        _create("workspace_manager", config.workspace) if config.workspace else None
    )

    return Harness(
        project_root=root,
        backend=backend,
        task_source=task_source,
        context_sources=context_sources or None,
        verifiers=verifiers or None,
        constraints=constraints or None,
        state_store=state_store,
        entropy_detectors=entropy_detectors or None,
        workspace_manager=workspace,
        inject_entropy_tasks=config.run.inject_entropy_tasks,
    )

get_stop_predicate(name)

Map a stop_when config string to a callable predicate.

Parameters:

Name Type Description Default
name str

One of "all_tasks_done", "max_epochs", "never".

required

Returns:

Type Description
Callable[[EpochState], bool]

A predicate compatible with Harness.run(stop_when=...).

Raises:

Type Description
ValueError

If name is not a recognized value.

Source code in src/rigger/_config.py
def get_stop_predicate(name: str) -> Callable[[EpochState], bool]:
    """Map a ``stop_when`` config string to a callable predicate.

    Args:
        name: One of ``"all_tasks_done"``, ``"max_epochs"``, ``"never"``.

    Returns:
        A predicate compatible with ``Harness.run(stop_when=...)``.

    Raises:
        ValueError: If *name* is not a recognized value.
    """
    predicates: dict[str, Callable[[EpochState], bool]] = {
        "all_tasks_done": all_tasks_done,
        "max_epochs": lambda state: False,
        "never": lambda state: False,
    }
    if name not in predicates:
        msg = (
            f"Unknown stop_when value {name!r}. "
            f"Must be one of: {', '.join(sorted(predicates))}"
        )
        raise ValueError(msg)
    return predicates[name]

load_config(path)

Parse a harness.yaml file into a HarnessConfig.

Performs environment variable interpolation on all string values and validates the top-level structure.

Parameters:

Name Type Description Default
path Path | str

Path to the YAML config file.

required

Returns:

Type Description
HarnessConfig

Parsed and validated configuration.

Raises:

Type Description
FileNotFoundError

If the config file does not exist.

ValueError

If the YAML structure is invalid.

Source code in src/rigger/_config.py
def load_config(path: Path | str) -> HarnessConfig:
    """Parse a ``harness.yaml`` file into a ``HarnessConfig``.

    Performs environment variable interpolation on all string values
    and validates the top-level structure.

    Args:
        path: Path to the YAML config file.

    Returns:
        Parsed and validated configuration.

    Raises:
        FileNotFoundError: If the config file does not exist.
        ValueError: If the YAML structure is invalid.
    """
    path = Path(path)
    if not path.exists():
        msg = f"Config file not found: {path}"
        raise FileNotFoundError(msg)

    with Path.open(path) as f:
        raw = yaml.safe_load(f)

    if not isinstance(raw, dict):
        msg = f"Config file must contain a YAML mapping, got {type(raw).__name__}"
        raise ValueError(msg)

    # Interpolate env vars in all string values
    raw = _interpolate_recursive(raw)

    # Warn on unknown top-level keys
    unknown = set(raw.keys()) - _VALID_TOP_KEYS
    if unknown:
        logger.warning(
            "Unknown top-level keys in config: %s", ", ".join(sorted(unknown))
        )

    # Required fields
    if "backend" not in raw:
        msg = "Missing required key: 'backend'"
        raise ValueError(msg)
    if "task_source" not in raw:
        msg = "Missing required key: 'task_source'"
        raise ValueError(msg)

    config_dir = path.parent.resolve()

    return HarnessConfig(
        backend=_parse_component(raw["backend"], "backend"),
        task_source=_parse_component(raw["task_source"], "task_source"),
        context_sources=_parse_component_list(
            raw.get("context_sources"), "context_sources"
        ),
        verifiers=_parse_component_list(raw.get("verifiers"), "verifiers"),
        constraints=_parse_component_list(raw.get("constraints"), "constraints"),
        state_store=(
            _parse_component(raw["state_store"], "state_store")
            if "state_store" in raw
            else None
        ),
        entropy_detectors=_parse_component_list(
            raw.get("entropy_detectors"), "entropy_detectors"
        ),
        workspace=(
            _parse_component(raw["workspace"], "workspace")
            if "workspace" in raw
            else None
        ),
        run=_parse_run(raw.get("run")),
        config_dir=config_dir,
    )

all_tasks_done(state)

Default stop predicate. Returns True when no pending tasks remain.

Source code in src/rigger/_harness.py
def all_tasks_done(state: EpochState) -> bool:
    """Default stop predicate. Returns True when no pending tasks remain."""
    return not state.pending_tasks

acquire_lock(project_root, *, force=False)

Create .harness/harness.lock, preventing concurrent runs.

Parameters:

Name Type Description Default
project_root Path

Root of the project containing .harness/.

required
force bool

Override an existing lock (logs a warning).

False

Returns:

Type Description
LockInfo

The LockInfo describing the newly-acquired lock.

Raises:

Type Description
HarnessAlreadyRunning

If a live lock is held and force is False.

Source code in src/rigger/_lock.py
def acquire_lock(
    project_root: Path,
    *,
    force: bool = False,
) -> LockInfo:
    """Create ``.harness/harness.lock``, preventing concurrent runs.

    Args:
        project_root: Root of the project containing ``.harness/``.
        force: Override an existing lock (logs a warning).

    Returns:
        The ``LockInfo`` describing the newly-acquired lock.

    Raises:
        HarnessAlreadyRunning: If a live lock is held and *force* is False.
    """
    existing = _read_lock(project_root)

    if existing is not None:
        same_host = existing.hostname == platform.node()

        if force:
            logger.warning(
                "Force-overriding existing lock (pid=%d, host=%s)",
                existing.pid,
                existing.hostname,
            )
        elif same_host and _pid_alive(existing.pid):
            msg = (
                f"Another harness instance is running "
                f"(pid={existing.pid}, instance={existing.instance_id}, "
                f"started={time.ctime(existing.timestamp)}). "
                f"Use force=True to override."
            )
            raise HarnessAlreadyRunning(msg)
        elif not same_host:
            msg = (
                f"Lock held by a different host "
                f"(host={existing.hostname}, pid={existing.pid}, "
                f"instance={existing.instance_id}). "
                f"Cannot verify if PID is alive. "
                f"Use force=True to override."
            )
            raise HarnessAlreadyRunning(msg)
        else:
            # Same host, PID is dead → stale lock
            logger.warning(
                "Stale lock detected (pid=%d is dead) — acquiring",
                existing.pid,
            )

    info = LockInfo(
        pid=os.getpid(),
        timestamp=time.time(),
        instance_id=uuid.uuid4().hex,
        hostname=platform.node(),
    )
    _write_lock(project_root, info)
    return info

harness_lock(project_root, *, force=False)

Context manager that acquires the lock on entry and releases on exit.

Parameters:

Name Type Description Default
project_root Path

Root of the project containing .harness/.

required
force bool

Override an existing lock.

False

Yields:

Type Description
LockInfo

The LockInfo describing the held lock.

Raises:

Type Description
HarnessAlreadyRunning

If a live lock is held and force is False.

Source code in src/rigger/_lock.py
@contextmanager
def harness_lock(
    project_root: Path,
    *,
    force: bool = False,
) -> Iterator[LockInfo]:
    """Context manager that acquires the lock on entry and releases on exit.

    Args:
        project_root: Root of the project containing ``.harness/``.
        force: Override an existing lock.

    Yields:
        The ``LockInfo`` describing the held lock.

    Raises:
        HarnessAlreadyRunning: If a live lock is held and *force* is False.
    """
    info = acquire_lock(project_root, force=force)
    try:
        yield info
    finally:
        release_lock(project_root, info)

release_lock(project_root, lock_info)

Remove .harness/harness.lock if it matches lock_info.

Only deletes the lock if the on-disk instance_id matches the one we hold, preventing accidental removal of another instance's lock.

Parameters:

Name Type Description Default
project_root Path

Root of the project containing .harness/.

required
lock_info LockInfo

The LockInfo returned by acquire_lock.

required
Source code in src/rigger/_lock.py
def release_lock(project_root: Path, lock_info: LockInfo) -> None:
    """Remove ``.harness/harness.lock`` if it matches *lock_info*.

    Only deletes the lock if the on-disk ``instance_id`` matches
    the one we hold, preventing accidental removal of another
    instance's lock.

    Args:
        project_root: Root of the project containing ``.harness/``.
        lock_info: The ``LockInfo`` returned by ``acquire_lock``.
    """
    existing = _read_lock(project_root)
    if existing is None:
        return

    if existing.instance_id != lock_info.instance_id:
        logger.warning(
            "Lock instance_id mismatch (ours=%s, on-disk=%s) — not releasing",
            lock_info.instance_id,
            existing.instance_id,
        )
        return

    try:
        _lock_path(project_root).unlink(missing_ok=True)
    except OSError as exc:
        logger.warning("Failed to remove lock file: %s", exc)

merge_metadata(results)

Merge metadata from multiple constraint results into a single config.

Deterministic: same inputs always produce same output regardless of constraint evaluation order. Achieved by commutative set operations for lists, commutative min() for scalars, and sorted output.

Parameters:

Name Type Description Default
results list[VerifyResult]

VerifyResult instances from Constraint.check() calls.

required

Returns:

Type Description
dict[str, Any]

Merged metadata dict ready for .harness/constraints.json.

Source code in src/rigger/_merge.py
def merge_metadata(results: list[VerifyResult]) -> dict[str, Any]:
    """Merge metadata from multiple constraint results into a single config.

    Deterministic: same inputs always produce same output regardless of
    constraint evaluation order. Achieved by commutative set operations
    for lists, commutative min() for scalars, and sorted output.

    Args:
        results: VerifyResult instances from Constraint.check() calls.

    Returns:
        Merged metadata dict ready for .harness/constraints.json.
    """
    # Phase 1: Collect all metadata values per key, preserving source order.
    key_values: dict[str, list[Any]] = {}
    for result in results:
        for key, value in result.metadata.items():
            key_values.setdefault(key, []).append(value)

    # Phase 2: Merge each key according to its family.
    merged: dict[str, Any] = {}

    for key, values in key_values.items():
        family = _MERGE_FAMILIES.get(key)

        if family == "restrictive":
            merged[key] = _merge_restrictive(key, values)
        elif family == "additive":
            merged[key] = _merge_additive(values)
        elif family == "scalar_min":
            merged[key] = _merge_scalar_min(values)
        else:
            # Unknown key — last-writer-wins with INFO log on conflict.
            if len(values) > 1:
                logger.info(
                    "Metadata key '%s' set by %d constraints; "
                    "using last value (no merge family registered)",
                    key,
                    len(values),
                )
            merged[key] = values[-1]

    return merged

write_entropy_tasks(project_root, tasks)

Write entropy tasks to a partitioned file in .harness/entropy/.

Creates .harness/entropy/tasks_{timestamp}_{uuid}.json with atomic write semantics. Each file is an independent partition that eliminates shared-mutable-file race conditions.

Parameters:

Name Type Description Default
project_root Path

Root of the project.

required
tasks list[Task]

Non-empty list of entropy tasks to write.

required

Returns:

Type Description
Path

Path to the created partition file.

Raises:

Type Description
ValueError

If tasks list is empty.

Source code in src/rigger/_schema.py
def write_entropy_tasks(project_root: Path, tasks: list[Task]) -> Path:
    """Write entropy tasks to a partitioned file in .harness/entropy/.

    Creates ``.harness/entropy/tasks_{timestamp}_{uuid}.json`` with
    atomic write semantics. Each file is an independent partition
    that eliminates shared-mutable-file race conditions.

    Args:
        project_root: Root of the project.
        tasks: Non-empty list of entropy tasks to write.

    Returns:
        Path to the created partition file.

    Raises:
        ValueError: If tasks list is empty.
    """
    if not tasks:
        msg = "Cannot write empty entropy task list"
        raise ValueError(msg)

    entropy_dir = ensure_harness_dir(project_root) / ENTROPY_DIR
    entropy_dir.mkdir(parents=True, exist_ok=True)

    ts = int(time.time() * 1000)
    uid = uuid.uuid4().hex[:8]
    filename = f"tasks_{ts}_{uid}.json"
    target = entropy_dir / filename

    data: list[dict[str, Any]] = [
        {
            "_schema_version": SCHEMA_VERSION,
            "id": t.id,
            "description": t.description,
            "metadata": t.metadata,
        }
        for t in tasks
    ]
    _atomic_write(target, {"tasks": data})
    return target