Temporal
Temporal is a common runtime for production agent orchestration: its programming model, durable workflows composed of retriable activities, maps naturally onto the agent pattern of "plan a sequence of tool calls, execute them with retry, roll back on failure." The chio-temporal package attaches an activity interceptor that evaluates every activity through the chio sidecar, scopes the workflow with a WorkflowGrant pinned to the Temporal workflow_id, and aggregates per-activity receipts into a WorkflowReceipt envelope for the chio receipt store.
Why Temporal Through chio
chio evaluates a capability against a policy and produces a signed receipt. Temporal provides what that evaluation surface does not: persistence, retry with backoff, saga compensation, visibility, and multi-worker distribution. The thesis is simple. Every Temporal activity that performs a tool call should pass through the chio kernel for capability validation and receipt signing. A workflow-level grant sets the envelope; per-activity evaluation enforces it step by step.
| Temporal alone | Temporal + chio |
|---|---|
| Activities retry on failure | Denied activities raise non-retryable ApplicationError so the workflow does not spin |
| Workflow history is append-only | Per-activity receipts are signed and aggregated into a WorkflowReceipt envelope |
| Authorization is per-namespace or queue | Authorization is per-tool, per-scope, time-bounded, attenuated per activity if needed |
| Saga compensation is developer-defined | Deny verdict is a non-retryable error type saga logic can match on |
Public Surface
| Name | Responsibility |
|---|---|
ChioActivityInterceptor | Worker-level Interceptor that gates Activity execution. A deny raises ApplicationError(type="ChioCapabilityDenied", non_retryable=True). |
WorkflowGrant | Capability token pinned to a Temporal workflow_id (and optionally run_id). Activities inherit the grant; attenuate_for_activity mints strictly narrower child grants. |
WorkflowReceipt, WorkflowStepReceipt | Aggregate of per-activity receipts in a workflow run, serialised to the chio-temporal/v1 JSON envelope. |
build_chio_worker | Convenience builder that mints a grant, constructs the interceptor, and returns (worker, interceptor, grant). |
ActivityGrantOverride | Callable signature for the per-activity grant-override hook registered via register_activity_grant_override. |
DENIED_ERROR_TYPE | The string "ChioCapabilityDenied", used as the ApplicationError type on deny so saga compensation can match on it. |
ChioTemporalError, ChioTemporalConfigError | Error types. |
Deployment Topology
The chio sidecar runs alongside the activity worker: same pod in Kubernetes, same host in VM deployments. Workflow workers do not call the sidecar directly; they orchestrate. Activity workers are the enforcement point, because Temporal's execution model treats activities as the side-effect boundary.
Receipts are side-effects, not workflow state
Activity Interceptor
The primary path is the activity interceptor. Register one on the worker; every activity execution is automatically evaluated through the chio sidecar using the capability from the registered WorkflowGrant.
from chio_sdk import ChioClient
from chio_sdk.models import CapabilityToken
from chio_temporal import ChioActivityInterceptor, WorkflowGrant
from temporalio.client import Client
from temporalio.worker import Worker
chio = ChioClient("http://127.0.0.1:9090")
client = await Client.connect("localhost:7233")
interceptor = ChioActivityInterceptor(
chio_client=chio,
sidecar_url="http://127.0.0.1:9090",
# Fallback tool_server id when an activity has no explicit mapping.
default_tool_server="agent-tools",
# Per-activity-type override. Activities normally map 1:1 to a
# chio tool server in production deployments.
activity_tool_server_map={
"call_tool": "agent-tools",
"read_database": "db-readonly",
"send_email": "email-outbound",
},
# Optional: drain finalised workflow receipts to a sink.
receipt_sink=my_receipt_sink,
)
# Before starting the workflow, register the grant for its workflow_id.
token: CapabilityToken = await chio.create_capability(
subject="agent:my-pipeline",
scope=scope_for("call_tool", "read_database", "send_email"),
ttl_seconds=3600,
)
grant = WorkflowGrant(
workflow_id="agent-run-123",
token=token,
tool_server="agent-tools",
)
interceptor.register_workflow_grant(grant)
worker = Worker(
client,
task_queue="agent-tasks",
workflows=[AgentWorkflow],
activities=[call_tool, read_database, send_email],
interceptors=[interceptor],
)The interceptor looks up the grant when the first activity for a given (workflow_id, run_id) pair executes. It then evaluates each activity against the sidecar using the grant's capability id and records a WorkflowStepReceipt on an in-flight WorkflowReceipt. Concurrent workflows on the same worker do not stomp each other's step lists; state is keyed on (workflow_id, run_id).
No scope_map kwarg
ChioActivityInterceptor does not take a scope_map kwarg. Scope is carried by the WorkflowGrant (via its CapabilityToken.scope). Use activity_tool_server_map to map activity types to chio tool server ids, and register per-activity grant overrides when a specific activity should run under a narrower capability.Convenience Builder
build_chio_worker wires the standard pieces in one call: mint a capability (or reuse a supplied one), construct the interceptor, register the grant, build the worker, and return the trio.
from chio_temporal import build_chio_worker
worker, interceptor, grant = await build_chio_worker(
client,
task_queue="agent-tasks",
activities=[call_tool, read_database, send_email],
workflows=[AgentWorkflow],
chio_client=chio,
workflow_id="agent-run-123",
# Either supply a pre-minted capability id:
capability_id="cap-abc",
# ...or a scope + subject, and the builder mints one:
scope=scope_for("call_tool", "read_database", "send_email"),
subject="agent:my-pipeline",
ttl_seconds=3600,
default_tool_server="agent-tools",
activity_tool_server_map={"send_email": "email-outbound"},
)
await worker.run()Workflow Grants and Attenuation
A WorkflowGrant pins a capability token to a Temporal workflow_id. Optional run_id pinning lets a grant bind to one execution; otherwise it applies across every run of the workflow id. attenuate_for_activity mints a child grant whose scope is a strict subset of the parent:
narrower = await grant.attenuate_for_activity(
chio,
new_scope=scope_for("read_database"),
tool_server="db-readonly",
)
# Register a per-activity-type hook that returns the narrower grant.
interceptor.register_activity_grant_override(
"read_database",
lambda info: narrower,
)The hook is invoked with the Temporal activity.Info for each execution; returning None falls back to the workflow-level grant. The interceptor verifies the override scope is a subset of the workflow grant scope and raises ChioTemporalConfigError if not.
Deny Handling
When the sidecar returns a deny verdict, the interceptor records the deny receipt on the workflow's WorkflowReceipt and raises a non-retryable ApplicationError with type "ChioCapabilityDenied". Sagas can catch the error and run compensations:
from temporalio.exceptions import ApplicationError
from chio_temporal import DENIED_ERROR_TYPE
@workflow.defn
class TransferWorkflow:
@workflow.run
async def run(self, transfer: Transfer) -> TransferResult:
compensations: list[tuple[str, list]] = []
debit = await workflow.execute_activity(
debit_account,
args=[transfer.source, transfer.amount],
start_to_close_timeout=timedelta(seconds=30),
)
compensations.append(("credit_account", [transfer.source, transfer.amount]))
try:
credit = await workflow.execute_activity(
credit_account,
args=[transfer.destination, transfer.amount],
start_to_close_timeout=timedelta(seconds=30),
)
except ApplicationError as exc:
if exc.type == DENIED_ERROR_TYPE:
for name, args in reversed(compensations):
await workflow.execute_activity(
name, args=args,
start_to_close_timeout=timedelta(seconds=30),
)
raise
return TransferResult(debit=debit, credit=credit)Sidecar transport errors are retryable
ApplicationError(type="ChioSidecarError", non_retryable=False) so Temporal applies its standard retry policy.Workflow Receipt Aggregation
The interceptor maintains a WorkflowReceipt per (workflow_id, run_id) pair. Each executed activity appends a WorkflowStepReceipt. Call finalize_workflow when the workflow completes, then flush_workflow_receipt to forward the envelope to the configured sink:
# At workflow completion (typically from a "finalise" activity).
interceptor.finalize_workflow(
workflow_id="agent-run-123",
run_id=run_id,
outcome="success", # or "failure" | "cancelled"
)
envelope = await interceptor.flush_workflow_receipt(
workflow_id="agent-run-123",
run_id=run_id,
)
# envelope["version"] == "chio-temporal/v1"
# envelope has: workflow_id, run_id, parent_workflow_ids, started_at,
# completed_at, outcome, step_count, allow_count, deny_count, steps, metadataWorkflow history itself does not store receipt payloads. Each activity result carries only the chio receipt id; the receipt content lives externally in the chio receipt store, which keeps replay deterministic.
Roadmap
The following are not shipped today
- HITL approval inside activities. Pausing an Activity on a
pending_approvalverdict and resuming it via a Temporal Signal is planned once the signal payload shape is stable. NoChioActivityContexttype is exported yet; activity code that needs finer-grained control should callChioClient.evaluate_tool_calldirectly. - Durable budget reconciliation. A first-class helper for reversing budget charges after a failed workflow (something like a
reverse_budget_chargeactivity) is planned; today, compensate at the business layer. - Rust worker middleware. For
temporal-sdk-coreworkers, an activity middleware mirroring the Python interceptor is on the roadmap. Rust code paths go through the Python interceptor today via the language-neutral sidecar. - Multi-cluster failover. Temporal supports multi-cluster replication; chio receipt logs are per-kernel. Continuity across cluster failover is an open design question.
Package Layout
sdks/python/chio-temporal/
pyproject.toml # deps: chio-sdk, temporalio
src/chio_temporal/
__init__.py # public surface (see table above)
interceptor.py # ChioActivityInterceptor, ActivityGrantOverride
grants.py # WorkflowGrant (+ attenuate_for_activity)
receipt.py # WorkflowReceipt, WorkflowStepReceipt
worker.py # build_chio_worker
errors.py # ChioTemporalError, ChioTemporalConfigError
tests/
test_interceptor.py
test_workflow_receipt.py