Chio/Docs

Prefect

Prefect is the Python-native orchestrator teams reach for when their pipelines outgrow cron. Its task and flow model maps directly onto the chio capability model: a flow is a scoped unit of work, a task is a guarded tool call. The chio-prefect package wraps Prefect's decorators so every task is evaluated by the chio kernel before its body runs, every decision surfaces as a Prefect event on the flow-run timeline, and scope at the flow boundary is enforced as a strict upper bound on what any enclosed task can do.


Why Prefect Through chio

Prefect already has the ergonomics most agent pipelines want: typed function decorators, retry policies, async-first execution, a UI that shows you where time is going. What it does not have is a policy layer. Scheduling a flow does not restrict what tools it can call, and @task is a pure execution primitive with no notion of authority. The chio integration fills in exactly that layer and nothing else.

Prefect alonePrefect + chio
Tasks retry on failureTasks are denied before side-effects when a capability is revoked
Flow parameters gate inputsFlow scope gates the set of tools every enclosed task may call
Events record task state transitionsEvents record signed receipts linked to flow-run and task-run ids
Access control is deployment-levelAccess control is per-tool, per-scope, time-bounded, revocable
Run history answers "what happened"Receipt chain answers "what was allowed and why"

Install

The package targets Prefect 3 and ships alongside chio-sdk. The chio sidecar runs next to your Prefect worker; no new processes to manage if you already have a sidecar for other SDKs.

bash
uv pip install chio-prefect
# or
pip install chio-prefect

No environment variables are required. The default sidecar URL is http://127.0.0.1:9090; pass sidecar_url= on any decorator to override.


Deployment Topology

Prefect workers run the task bodies, so they are the enforcement point. The chio kernel runs as a sidecar on the same host or pod; the decorators speak HTTP to it. Prefect's API server stays untouched, its events backend stays untouched, and the chio receipt log stays external to flow-run history.

rendering…
Workers call the chio sidecar before every decorated task body runs. Decisions flow back as Prefect events on the same flow-run timeline.

One sidecar per worker, not per flow

The sidecar is a per-host enforcement daemon, not a per-flow attachment. A Prefect worker running many concurrent flows shares a single sidecar; concurrency is bounded by the sidecar's HTTP pool, not by the decorator.

Quickstart

Two decorators: @chio_task wraps a Prefect task, @chio_flow wraps a Prefect flow. Task bodies receive an allow/deny verdict before they run, and the verdict is emitted as a Prefect event tied to the task-run id.

python
from chio_sdk.client import ChioClient
from chio_sdk.models import ChioScope, Operation, ToolGrant
from chio_prefect import chio_flow, chio_task

PIPELINE_SCOPE = ChioScope(
    grants=[
        ToolGrant(
            server_id="search-srv",
            tool_name="search_documents",
            operations=[Operation.INVOKE],
        ),
        ToolGrant(
            server_id="search-srv",
            tool_name="analyze_results",
            operations=[Operation.INVOKE],
        ),
    ]
)

@chio_task(tool_server="search-srv")
def search_documents(query: str) -> list[dict]:
    return external_search.run(query)

@chio_task(tool_server="search-srv")
def analyze_results(documents: list[dict]) -> dict:
    return analyzer.run(documents)

@chio_flow(
    scope=PIPELINE_SCOPE,
    capability_id="cap-research-pipeline",
    tool_server="search-srv",
)
def research_pipeline(query: str) -> dict:
    docs = search_documents(query)
    return analyze_results(docs)

research_pipeline("capability-based security")

The decorators are thin wrappers over Prefect's own: every option you would pass to @task or @flow passes through verbatim. Retries, timeouts, task runners, tags, custom result storage, all of it still works.


Flow Scope and Attenuation

The flow scope is a capability envelope. Tasks inside a flow cannot exceed it, which means a deployment that schedules research_pipeline with search and analysis grants cannot have one of its tasks silently acquire a file.write capability. The subset check is enforced at call time:

python
# Allowed: task scope is a subset of flow scope
@chio_task(
    scope=ChioScope(grants=[
        ToolGrant(server_id="search-srv", tool_name="search_documents",
                  operations=[Operation.INVOKE]),
    ]),
    tool_server="search-srv",
)
def search_documents(query: str) -> list[dict]: ...

# Denied at call time: task scope escapes the flow envelope
@chio_task(
    scope=ChioScope(grants=[
        ToolGrant(server_id="fs-srv", tool_name="write_file",
                  operations=[Operation.INVOKE]),
    ]),
    tool_server="fs-srv",
)
def write_file(path: str, body: bytes) -> None: ...

Tasks that omit scope inherit the enclosing flow's scope. Standalone tasks (tasks called outside any @chio_flow) must declare their own capability_id; omitting it raises ChioPrefectConfigError at decoration time, not at runtime.

Use flow scope as the contract, task scope as the annotation

In practice the flow scope is what a deployment owner signs off on; task scope is how a developer documents which grant a task actually needs. Keeping task scope narrower than flow scope is what makes the subset check useful, rather than decorative.

Receipts as Prefect Events

Every task evaluation produces a receipt, and every receipt is mirrored as a Prefect event on the task-run timeline. Two event names are used:

EventWhen emittedPayload
chio.receipt.allowBefore task body runs, on allowReceipt id, capability id, tool server, tool name, timestamp
chio.receipt.denyBefore task body runs, on denyReceipt id, guard name, deny reason, full decision dict
python
# A receipt emitted on the Prefect events backend
{
    "event": "chio.receipt.allow",
    "resource": {
        "prefect.resource.id": "prefect.task-run.<task-run-id>",
        "prefect.flow-run.id": "<flow-run-id>",
    },
    "payload": {
        "receipt_id": "01HXYZ...7K4",
        "verdict": "allow",
        "capability_id": "cap-research-pipeline",
        "tool_server": "search-srv",
        "tool_name": "search_documents",
        "task_name": "search_documents",
        "timestamp": "2026-04-19T20:14:02.118Z",
    },
}

Events are resource-linked to the task-run and flow-run ids, which means the Prefect UI renders them on the correct row and you can pivot from a receipt back to the exact flow run in one query. If the events backend is unavailable the decorator logs at INFO; it will not silently drop a receipt.


Denials and Retries

A deny verdict raises PermissionError from the task's wrapper, which Prefect marks as a task failure. The chio deny payload is available on the exception as ChioPrefectError, so downstream code can distinguish a policy failure from a business failure:

python
from chio_prefect import ChioPrefectError

try:
    research_pipeline("sensitive query")
except PermissionError as e:
    if isinstance(e.__cause__, ChioPrefectError):
        err: ChioPrefectError = e.__cause__
        log.warning(
            "denied by chio",
            receipt_id=err.receipt_id,
            guard=err.guard,
            reason=err.reason,
        )
    else:
        raise

By default, denials are not retried: replaying a policy decision that already returned "deny" with the same inputs is pointless and spends evaluation budget. If your guard depends on state that can change between attempts, opt into retry with a retry_condition_fn on the task:

python
from chio_prefect import ChioPrefectError

def retry_on_rate_limit(task, task_run, state) -> bool:
    exc = state.result(raise_on_failure=False)
    if isinstance(exc, PermissionError) and isinstance(exc.__cause__, ChioPrefectError):
        return exc.__cause__.guard == "rate-limit-guard"
    return False

@chio_task(
    tool_server="search-srv",
    retries=3,
    retry_delay_seconds=30,
    retry_condition_fn=retry_on_rate_limit,
)
def search_documents(query: str) -> list[dict]: ...

Async and Sync

Both shapes are supported and the decorator preserves the function's signature. An async def becomes an async Prefect task; a plain def becomes a sync task whose chio evaluation is driven from a throwaway event loop so the worker is never blocked.

python
@chio_task(tool_server="search-srv")
async def search_documents(query: str) -> list[dict]:
    return await external_search.run(query)

@chio_task(tool_server="search-srv")
def analyze_results(documents: list[dict]) -> dict:
    return analyzer.run(documents)

Testing

The chio_sdk.testing helpers include allow_all() and deny_all() mock clients. Inject them via the chio_client= parameter on the decorator so flows can be unit-tested without a live sidecar:

python
from chio_sdk.testing import allow_all, deny_all
from chio_prefect import chio_task, chio_flow

def test_pipeline_happy_path():
    client = allow_all()

    @chio_task(tool_server="srv", chio_client=client)
    def double(x: int) -> int:
        return x * 2

    @chio_flow(
        scope=PIPELINE_SCOPE,
        capability_id="cap-test",
        tool_server="srv",
        chio_client=client,
    )
    def pipeline() -> int:
        return double(21)

    assert pipeline() == 42

def test_pipeline_denied():
    client = deny_all(reason="budget exceeded")

    @chio_task(tool_server="srv", chio_client=client)
    def double(x: int) -> int:
        return x * 2

    with pytest.raises(PermissionError):
        double(21)

Package Layout

text
sdks/python/chio-prefect/
  pyproject.toml          # deps: chio-sdk, prefect >= 3
  src/chio_prefect/
    __init__.py           # chio_task, chio_flow, errors, events
    decorators.py         # task and flow wrappers
    events.py             # Prefect event emission
    errors.py             # ChioPrefectError, ChioPrefectConfigError
    context.py            # flow-scope ContextVar plumbing
  tests/
    test_task_decorator.py
    test_flow_attenuation.py
    test_event_emission.py

Next Steps

  • Temporal · the durable workflow counterpart, with workflow-level grants and saga compensation
  • LangGraph · graph-based agent orchestration on top of the same kernel surface
  • Budgets · attach spend envelopes to a flow and reconcile them on deny or failure
  • Receipt format · the payload shape mirrored into Prefect events