Chio/Docs

Temporal

Temporal is a common runtime for production agent orchestration: its programming model, durable workflows composed of retriable activities, maps naturally onto the agent pattern of "plan a sequence of tool calls, execute them with retry, roll back on failure." The chio-temporal package attaches an activity interceptor that evaluates every activity through the chio sidecar, scopes the workflow with a WorkflowGrant pinned to the Temporal workflow_id, and aggregates per-activity receipts into a WorkflowReceipt envelope for the chio receipt store.


Why Temporal Through chio

chio evaluates a capability against a policy and produces a signed receipt. Temporal provides what that evaluation surface does not: persistence, retry with backoff, saga compensation, visibility, and multi-worker distribution. The thesis is simple. Every Temporal activity that performs a tool call should pass through the chio kernel for capability validation and receipt signing. A workflow-level grant sets the envelope; per-activity evaluation enforces it step by step.

Temporal aloneTemporal + chio
Activities retry on failureDenied activities raise non-retryable ApplicationError so the workflow does not spin
Workflow history is append-onlyPer-activity receipts are signed and aggregated into a WorkflowReceipt envelope
Authorization is per-namespace or queueAuthorization is per-tool, per-scope, time-bounded, attenuated per activity if needed
Saga compensation is developer-definedDeny verdict is a non-retryable error type saga logic can match on

Public Surface

NameResponsibility
ChioActivityInterceptorWorker-level Interceptor that gates Activity execution. A deny raises ApplicationError(type="ChioCapabilityDenied", non_retryable=True).
WorkflowGrantCapability token pinned to a Temporal workflow_id (and optionally run_id). Activities inherit the grant; attenuate_for_activity mints strictly narrower child grants.
WorkflowReceipt, WorkflowStepReceiptAggregate of per-activity receipts in a workflow run, serialised to the chio-temporal/v1 JSON envelope.
build_chio_workerConvenience builder that mints a grant, constructs the interceptor, and returns (worker, interceptor, grant).
ActivityGrantOverrideCallable signature for the per-activity grant-override hook registered via register_activity_grant_override.
DENIED_ERROR_TYPEThe string "ChioCapabilityDenied", used as the ApplicationError type on deny so saga compensation can match on it.
ChioTemporalError, ChioTemporalConfigErrorError types.

Deployment Topology

The chio sidecar runs alongside the activity worker: same pod in Kubernetes, same host in VM deployments. Workflow workers do not call the sidecar directly; they orchestrate. Activity workers are the enforcement point, because Temporal's execution model treats activities as the side-effect boundary.

rendering…
Workflow workers orchestrate; activity workers are the enforcement point and call the chio sidecar over HTTP for every activity invocation.

Receipts are side-effects, not workflow state

Never call chio from inside a workflow function. Workflow code must be deterministic for replay. All chio calls belong inside activities (and the interceptor runs at the activity boundary precisely for this reason).

Activity Interceptor

The primary path is the activity interceptor. Register one on the worker; every activity execution is automatically evaluated through the chio sidecar using the capability from the registered WorkflowGrant.

python
from chio_sdk import ChioClient
from chio_sdk.models import CapabilityToken
from chio_temporal import ChioActivityInterceptor, WorkflowGrant
from temporalio.client import Client
from temporalio.worker import Worker

chio = ChioClient("http://127.0.0.1:9090")
client = await Client.connect("localhost:7233")

interceptor = ChioActivityInterceptor(
    chio_client=chio,
    sidecar_url="http://127.0.0.1:9090",
    # Fallback tool_server id when an activity has no explicit mapping.
    default_tool_server="agent-tools",
    # Per-activity-type override. Activities normally map 1:1 to a
    # chio tool server in production deployments.
    activity_tool_server_map={
        "call_tool":     "agent-tools",
        "read_database": "db-readonly",
        "send_email":    "email-outbound",
    },
    # Optional: drain finalised workflow receipts to a sink.
    receipt_sink=my_receipt_sink,
)

# Before starting the workflow, register the grant for its workflow_id.
token: CapabilityToken = await chio.create_capability(
    subject="agent:my-pipeline",
    scope=scope_for("call_tool", "read_database", "send_email"),
    ttl_seconds=3600,
)
grant = WorkflowGrant(
    workflow_id="agent-run-123",
    token=token,
    tool_server="agent-tools",
)
interceptor.register_workflow_grant(grant)

worker = Worker(
    client,
    task_queue="agent-tasks",
    workflows=[AgentWorkflow],
    activities=[call_tool, read_database, send_email],
    interceptors=[interceptor],
)

The interceptor looks up the grant when the first activity for a given (workflow_id, run_id) pair executes. It then evaluates each activity against the sidecar using the grant's capability id and records a WorkflowStepReceipt on an in-flight WorkflowReceipt. Concurrent workflows on the same worker do not stomp each other's step lists; state is keyed on (workflow_id, run_id).

No scope_map kwarg

ChioActivityInterceptor does not take a scope_map kwarg. Scope is carried by the WorkflowGrant (via its CapabilityToken.scope). Use activity_tool_server_map to map activity types to chio tool server ids, and register per-activity grant overrides when a specific activity should run under a narrower capability.

Convenience Builder

build_chio_worker wires the standard pieces in one call: mint a capability (or reuse a supplied one), construct the interceptor, register the grant, build the worker, and return the trio.

python
from chio_temporal import build_chio_worker

worker, interceptor, grant = await build_chio_worker(
    client,
    task_queue="agent-tasks",
    activities=[call_tool, read_database, send_email],
    workflows=[AgentWorkflow],
    chio_client=chio,
    workflow_id="agent-run-123",
    # Either supply a pre-minted capability id:
    capability_id="cap-abc",
    # ...or a scope + subject, and the builder mints one:
    scope=scope_for("call_tool", "read_database", "send_email"),
    subject="agent:my-pipeline",
    ttl_seconds=3600,
    default_tool_server="agent-tools",
    activity_tool_server_map={"send_email": "email-outbound"},
)

await worker.run()

Workflow Grants and Attenuation

A WorkflowGrant pins a capability token to a Temporal workflow_id. Optional run_id pinning lets a grant bind to one execution; otherwise it applies across every run of the workflow id. attenuate_for_activity mints a child grant whose scope is a strict subset of the parent:

python
narrower = await grant.attenuate_for_activity(
    chio,
    new_scope=scope_for("read_database"),
    tool_server="db-readonly",
)

# Register a per-activity-type hook that returns the narrower grant.
interceptor.register_activity_grant_override(
    "read_database",
    lambda info: narrower,
)

The hook is invoked with the Temporal activity.Info for each execution; returning None falls back to the workflow-level grant. The interceptor verifies the override scope is a subset of the workflow grant scope and raises ChioTemporalConfigError if not.


Deny Handling

When the sidecar returns a deny verdict, the interceptor records the deny receipt on the workflow's WorkflowReceipt and raises a non-retryable ApplicationError with type "ChioCapabilityDenied". Sagas can catch the error and run compensations:

python
from temporalio.exceptions import ApplicationError
from chio_temporal import DENIED_ERROR_TYPE

@workflow.defn
class TransferWorkflow:
    @workflow.run
    async def run(self, transfer: Transfer) -> TransferResult:
        compensations: list[tuple[str, list]] = []
        debit = await workflow.execute_activity(
            debit_account,
            args=[transfer.source, transfer.amount],
            start_to_close_timeout=timedelta(seconds=30),
        )
        compensations.append(("credit_account", [transfer.source, transfer.amount]))

        try:
            credit = await workflow.execute_activity(
                credit_account,
                args=[transfer.destination, transfer.amount],
                start_to_close_timeout=timedelta(seconds=30),
            )
        except ApplicationError as exc:
            if exc.type == DENIED_ERROR_TYPE:
                for name, args in reversed(compensations):
                    await workflow.execute_activity(
                        name, args=args,
                        start_to_close_timeout=timedelta(seconds=30),
                    )
            raise

        return TransferResult(debit=debit, credit=credit)

Sidecar transport errors are retryable

A deny verdict is non-retryable by design (policy rejected the call; retrying will not help). Sidecar transport faults raise ApplicationError(type="ChioSidecarError", non_retryable=False) so Temporal applies its standard retry policy.

Workflow Receipt Aggregation

The interceptor maintains a WorkflowReceipt per (workflow_id, run_id) pair. Each executed activity appends a WorkflowStepReceipt. Call finalize_workflow when the workflow completes, then flush_workflow_receipt to forward the envelope to the configured sink:

python
# At workflow completion (typically from a "finalise" activity).
interceptor.finalize_workflow(
    workflow_id="agent-run-123",
    run_id=run_id,
    outcome="success",   # or "failure" | "cancelled"
)
envelope = await interceptor.flush_workflow_receipt(
    workflow_id="agent-run-123",
    run_id=run_id,
)
# envelope["version"] == "chio-temporal/v1"
# envelope has: workflow_id, run_id, parent_workflow_ids, started_at,
# completed_at, outcome, step_count, allow_count, deny_count, steps, metadata

Workflow history itself does not store receipt payloads. Each activity result carries only the chio receipt id; the receipt content lives externally in the chio receipt store, which keeps replay deterministic.


Roadmap

The following are not shipped today

These capabilities are tracked as follow-up work. The shipped v1 integration is the synchronous allow/deny path described above.
  • HITL approval inside activities. Pausing an Activity on a pending_approval verdict and resuming it via a Temporal Signal is planned once the signal payload shape is stable. No ChioActivityContext type is exported yet; activity code that needs finer-grained control should call ChioClient.evaluate_tool_call directly.
  • Durable budget reconciliation. A first-class helper for reversing budget charges after a failed workflow (something like a reverse_budget_charge activity) is planned; today, compensate at the business layer.
  • Rust worker middleware. For temporal-sdk-core workers, an activity middleware mirroring the Python interceptor is on the roadmap. Rust code paths go through the Python interceptor today via the language-neutral sidecar.
  • Multi-cluster failover. Temporal supports multi-cluster replication; chio receipt logs are per-kernel. Continuity across cluster failover is an open design question.

Package Layout

text
sdks/python/chio-temporal/
  pyproject.toml                # deps: chio-sdk, temporalio
  src/chio_temporal/
    __init__.py                 # public surface (see table above)
    interceptor.py              # ChioActivityInterceptor, ActivityGrantOverride
    grants.py                   # WorkflowGrant (+ attenuate_for_activity)
    receipt.py                  # WorkflowReceipt, WorkflowStepReceipt
    worker.py                   # build_chio_worker
    errors.py                   # ChioTemporalError, ChioTemporalConfigError
  tests/
    test_interceptor.py
    test_workflow_receipt.py

Next Steps

  • Kafka · enforce capabilities on event-driven producers and consumers
  • Budgets · how capability scope and per-invocation cost caps are set at mint time
  • LangGraph · pair durable workflows with graph-based agent orchestration