Prefect
Prefect is the Python-native orchestrator teams reach for when their pipelines outgrow cron. Its task and flow model maps directly onto the chio capability model: a flow is a scoped unit of work, a task is a guarded tool call. The chio-prefect package wraps Prefect's decorators so every task is evaluated by the chio kernel before its body runs, every decision surfaces as a Prefect event on the flow-run timeline, and scope at the flow boundary is enforced as a strict upper bound on what any enclosed task can do.
Why Prefect Through chio
Prefect already has the ergonomics most agent pipelines want: typed function decorators, retry policies, async-first execution, a UI that shows you where time is going. What it does not have is a policy layer. Scheduling a flow does not restrict what tools it can call, and @task is a pure execution primitive with no notion of authority. The chio integration fills in exactly that layer and nothing else.
| Prefect alone | Prefect + chio |
|---|---|
| Tasks retry on failure | Tasks are denied before side-effects when a capability is revoked |
| Flow parameters gate inputs | Flow scope gates the set of tools every enclosed task may call |
| Events record task state transitions | Events record signed receipts linked to flow-run and task-run ids |
| Access control is deployment-level | Access control is per-tool, per-scope, time-bounded, revocable |
| Run history answers "what happened" | Receipt chain answers "what was allowed and why" |
Install
The package targets Prefect 3 and ships alongside chio-sdk. The chio sidecar runs next to your Prefect worker; no new processes to manage if you already have a sidecar for other SDKs.
uv pip install chio-prefect
# or
pip install chio-prefectNo environment variables are required. The default sidecar URL is http://127.0.0.1:9090; pass sidecar_url= on any decorator to override.
Deployment Topology
Prefect workers run the task bodies, so they are the enforcement point. The chio kernel runs as a sidecar on the same host or pod; the decorators speak HTTP to it. Prefect's API server stays untouched, its events backend stays untouched, and the chio receipt log stays external to flow-run history.
One sidecar per worker, not per flow
Quickstart
Two decorators: @chio_task wraps a Prefect task, @chio_flow wraps a Prefect flow. Task bodies receive an allow/deny verdict before they run, and the verdict is emitted as a Prefect event tied to the task-run id.
from chio_sdk.client import ChioClient
from chio_sdk.models import ChioScope, Operation, ToolGrant
from chio_prefect import chio_flow, chio_task
PIPELINE_SCOPE = ChioScope(
grants=[
ToolGrant(
server_id="search-srv",
tool_name="search_documents",
operations=[Operation.INVOKE],
),
ToolGrant(
server_id="search-srv",
tool_name="analyze_results",
operations=[Operation.INVOKE],
),
]
)
@chio_task(tool_server="search-srv")
def search_documents(query: str) -> list[dict]:
return external_search.run(query)
@chio_task(tool_server="search-srv")
def analyze_results(documents: list[dict]) -> dict:
return analyzer.run(documents)
@chio_flow(
scope=PIPELINE_SCOPE,
capability_id="cap-research-pipeline",
tool_server="search-srv",
)
def research_pipeline(query: str) -> dict:
docs = search_documents(query)
return analyze_results(docs)
research_pipeline("capability-based security")The decorators are thin wrappers over Prefect's own: every option you would pass to @task or @flow passes through verbatim. Retries, timeouts, task runners, tags, custom result storage, all of it still works.
Flow Scope and Attenuation
The flow scope is a capability envelope. Tasks inside a flow cannot exceed it, which means a deployment that schedules research_pipeline with search and analysis grants cannot have one of its tasks silently acquire a file.write capability. The subset check is enforced at call time:
# Allowed: task scope is a subset of flow scope
@chio_task(
scope=ChioScope(grants=[
ToolGrant(server_id="search-srv", tool_name="search_documents",
operations=[Operation.INVOKE]),
]),
tool_server="search-srv",
)
def search_documents(query: str) -> list[dict]: ...
# Denied at call time: task scope escapes the flow envelope
@chio_task(
scope=ChioScope(grants=[
ToolGrant(server_id="fs-srv", tool_name="write_file",
operations=[Operation.INVOKE]),
]),
tool_server="fs-srv",
)
def write_file(path: str, body: bytes) -> None: ...Tasks that omit scope inherit the enclosing flow's scope. Standalone tasks (tasks called outside any @chio_flow) must declare their own capability_id; omitting it raises ChioPrefectConfigError at decoration time, not at runtime.
Use flow scope as the contract, task scope as the annotation
Receipts as Prefect Events
Every task evaluation produces a receipt, and every receipt is mirrored as a Prefect event on the task-run timeline. Two event names are used:
| Event | When emitted | Payload |
|---|---|---|
chio.receipt.allow | Before task body runs, on allow | Receipt id, capability id, tool server, tool name, timestamp |
chio.receipt.deny | Before task body runs, on deny | Receipt id, guard name, deny reason, full decision dict |
# A receipt emitted on the Prefect events backend
{
"event": "chio.receipt.allow",
"resource": {
"prefect.resource.id": "prefect.task-run.<task-run-id>",
"prefect.flow-run.id": "<flow-run-id>",
},
"payload": {
"receipt_id": "01HXYZ...7K4",
"verdict": "allow",
"capability_id": "cap-research-pipeline",
"tool_server": "search-srv",
"tool_name": "search_documents",
"task_name": "search_documents",
"timestamp": "2026-04-19T20:14:02.118Z",
},
}Events are resource-linked to the task-run and flow-run ids, which means the Prefect UI renders them on the correct row and you can pivot from a receipt back to the exact flow run in one query. If the events backend is unavailable the decorator logs at INFO; it will not silently drop a receipt.
Denials and Retries
A deny verdict raises PermissionError from the task's wrapper, which Prefect marks as a task failure. The chio deny payload is available on the exception as ChioPrefectError, so downstream code can distinguish a policy failure from a business failure:
from chio_prefect import ChioPrefectError
try:
research_pipeline("sensitive query")
except PermissionError as e:
if isinstance(e.__cause__, ChioPrefectError):
err: ChioPrefectError = e.__cause__
log.warning(
"denied by chio",
receipt_id=err.receipt_id,
guard=err.guard,
reason=err.reason,
)
else:
raiseBy default, denials are not retried: replaying a policy decision that already returned "deny" with the same inputs is pointless and spends evaluation budget. If your guard depends on state that can change between attempts, opt into retry with a retry_condition_fn on the task:
from chio_prefect import ChioPrefectError
def retry_on_rate_limit(task, task_run, state) -> bool:
exc = state.result(raise_on_failure=False)
if isinstance(exc, PermissionError) and isinstance(exc.__cause__, ChioPrefectError):
return exc.__cause__.guard == "rate-limit-guard"
return False
@chio_task(
tool_server="search-srv",
retries=3,
retry_delay_seconds=30,
retry_condition_fn=retry_on_rate_limit,
)
def search_documents(query: str) -> list[dict]: ...Async and Sync
Both shapes are supported and the decorator preserves the function's signature. An async def becomes an async Prefect task; a plain def becomes a sync task whose chio evaluation is driven from a throwaway event loop so the worker is never blocked.
@chio_task(tool_server="search-srv")
async def search_documents(query: str) -> list[dict]:
return await external_search.run(query)
@chio_task(tool_server="search-srv")
def analyze_results(documents: list[dict]) -> dict:
return analyzer.run(documents)Testing
The chio_sdk.testing helpers include allow_all() and deny_all() mock clients. Inject them via the chio_client= parameter on the decorator so flows can be unit-tested without a live sidecar:
from chio_sdk.testing import allow_all, deny_all
from chio_prefect import chio_task, chio_flow
def test_pipeline_happy_path():
client = allow_all()
@chio_task(tool_server="srv", chio_client=client)
def double(x: int) -> int:
return x * 2
@chio_flow(
scope=PIPELINE_SCOPE,
capability_id="cap-test",
tool_server="srv",
chio_client=client,
)
def pipeline() -> int:
return double(21)
assert pipeline() == 42
def test_pipeline_denied():
client = deny_all(reason="budget exceeded")
@chio_task(tool_server="srv", chio_client=client)
def double(x: int) -> int:
return x * 2
with pytest.raises(PermissionError):
double(21)Package Layout
sdks/python/chio-prefect/
pyproject.toml # deps: chio-sdk, prefect >= 3
src/chio_prefect/
__init__.py # chio_task, chio_flow, errors, events
decorators.py # task and flow wrappers
events.py # Prefect event emission
errors.py # ChioPrefectError, ChioPrefectConfigError
context.py # flow-scope ContextVar plumbing
tests/
test_task_decorator.py
test_flow_attenuation.py
test_event_emission.pyNext Steps
- Temporal · the durable workflow counterpart, with workflow-level grants and saga compensation
- LangGraph · graph-based agent orchestration on top of the same kernel surface
- Budgets · attach spend envelopes to a flow and reconcile them on deny or failure
- Receipt format · the payload shape mirrored into Prefect events