Chio/Docs

Kafka

Orchestrators like Temporal and Airflow have a coordinator to wrap. Choreography does not. Agents that react autonomously to Kafka topics have no single point where policy can be enforced, which is exactly what makes event-driven agent systems structurally hard to govern. chio plugs into the consumer side, evaluates each event before the agent acts on it, and writes receipts into Kafka transactions so the offset commit and the attestation land atomically. The integration ships today as the chio-streaming Python package wrapping confluent-kafka-python, with adapters for NATS, Pulsar, EventBridge, Pub/Sub, and Redis Streams on the documented roadmap.


Why Kafka Through chio

The Kafka consumer loop is the agent's trigger loop. An event arrives on a topic, the agent decides what to do, it calls tools in response, and it emits result events downstream. Every link in that chain is a capability boundary: the agent must be authorized to consume the inbound topic, authorized to call each tool, and authorized to produce to each outbound topic. chio evaluates all three.

The alternative is no governance at all. Kafka itself offers ACLs for topic read and write, but ACLs do not model scope, budget, guard pipelines, or signed attestation. They answer whether a principal can read a topic, not whether this specific message, with this specific content, in this specific choreography, is permitted to drive a tool call right now.

Event streaming aloneEvent streaming + chio
Agents consume freely once they have ACL readScoped capabilities per topic and per content class
No audit of tools an agent invokes in responseSigned receipts on every tool call triggered by an event
Schema Registry governs data shapechio governs what agents do with the data
Dead letter means processing failedDead letter means processing was not authorized
Exactly-once avoids duplicate processingExactly-once plus receipt commit: attested processing

Consumer-Side Enforcement

chio does not touch the broker. The broker stays a dumb pipe, which is important for compatibility with managed services like MSK and Confluent Cloud. Governance lives inside the consumer process, next to the agent loop, and every poll goes through the kernel before it is handed to application code:

rendering…
Consumer-side enforcement: every poll is evaluated before application code runs; denied messages route to DLQ with a signed denial receipt.

Two capability boundaries exist per event. First, consumption: is this agent authorized to see this topic and this message? Second, tool invocation: for each tool the agent calls in response, is the call permitted? Production is a third boundary when the agent writes result events back out. Each of the three produces a distinct receipt, and the receipts link together so the full chain can be reconstructed.


Transactional Receipt Commit

Kafka's exactly-once semantics provide the machinery chio needs to land an offset commit and a receipt commit together. Either both succeed or both roll back. If the agent crashes between the tool call and the commit, the event will be redelivered and the receipt for the aborted attempt is flagged as rolled back, so auditors can distinguish a real invocation from a transient failure.

rendering…
Kafka transactions atomically land the offset commit and the receipt commit together, so aborts leave no partial attestation.

Receipts are external to Kafka state

The receipt itself is not a Kafka record. Kafka carries only the receipt id in message headers. The full signed receipt lives in the chio receipt log, which is durable and independently queryable. This keeps the transaction lightweight and avoids bloating topic storage with attestation payloads.

Consumer Middleware

The lowest-friction path is the drop-in consumer wrapper. It preserves the confluent-kafka-python API you already use, adds capability evaluation on poll, and routes denied messages to a configurable DLQ topic:

python
from confluent_kafka import Consumer
from chio_streaming import ChioConsumerMiddleware

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "research-agents",
    "enable.auto.commit": False,
})

chio_consumer = ChioConsumerMiddleware(
    consumer=consumer,
    sidecar_url="http://127.0.0.1:9090",
    scope_map={
        "research-tasks": "events:consume:research-tasks",
        "urgent-tasks":   "events:consume:urgent-tasks",
    },
    identity="research-agent-group",
    dlq_topic="chio-denied-events",
)

while True:
    event = chio_consumer.poll(timeout=1.0)
    if event is None:
        continue

    # If we reach here, chio authorized consumption and
    # event.chio_receipt_id is populated on the event object.
    process(event)

    # Tool calls inside process() are separately evaluated
    # via the standard chio SDK (chio-sdk-python / chio-fastapi / etc).

    chio_consumer.commit(event)

The wrapper subscribes the dictionary keys in scope_map and installs a per-topic evaluation hook. When a message arrives, the middleware builds an evaluation request with the configured scope and the consumer group as identity, then either yields the message or publishes a structured denial event to dlq_topic.


Producer Middleware

Outbound events carry receipts so that downstream consumers can attribute the event to a specific, attested action:

python
from confluent_kafka import Producer
from chio_streaming import ChioProducerMiddleware

producer = Producer({"bootstrap.servers": "localhost:9092"})

chio_producer = ChioProducerMiddleware(
    producer=producer,
    sidecar_url="http://127.0.0.1:9090",
    scope_map={
        "order-events":  "events:produce:order-events",
        "notifications": "events:produce:notifications",
    },
)

# Evaluated on produce, receipt auto-attached as headers:
#   X-Chio-Receipt: rcpt_abc123
#   X-Chio-Scope: events:produce:order-events
chio_producer.produce(
    topic="order-events",
    value=json.dumps({"order_id": "123", "status": "confirmed"}),
)

Transactional Consumer-Producer

For the end-to-end atomic path, use the ChioTransactionalProcessor. It manages a Kafka transaction across consume, tool call, and produce, and ensures the receipt bundle is committed or aborted with the Kafka offsets:

python
from chio_streaming.kafka import ChioTransactionalProcessor

processor = ChioTransactionalProcessor(
    bootstrap_servers="localhost:9092",
    group_id="order-agents",
    consume_topics=["orders"],
    sidecar_url="http://127.0.0.1:9090",
    consume_scope="events:consume:orders",
    produce_scope_map={
        "order-events":     "events:produce:order-events",
        "payment-requests": "events:produce:payment-requests",
    },
)

async def handle_order(event, ctx):
    """Runs inside a Kafka transaction."""
    order = json.loads(event.value())

    # Separately evaluated tool call
    inventory = await ctx.chio.invoke(
        tool="check-inventory",
        scope="tools:inventory:read",
        arguments={"sku": order["sku"]},
    )

    if inventory["available"]:
        # Produces inside the same transaction
        await ctx.produce(
            topic="payment-requests",
            value=json.dumps({
                "order_id": order["id"],
                "amount":   order["total"],
            }),
        )

    # Transaction commits: offset + produced messages + receipts
    # All atomic, either all succeed or none do.

processor.register("orders", handle_order)
processor.run()

Consumer Group as Agent Swarm

A Kafka consumer group is, structurally, an agent swarm. Every consumer instance in the group is an autonomous worker reading from the same logical task stream. chio treats the group as a first-class governance unit: a single capability grant scopes the whole swarm, and a shared budget is distributed across its members. Rebalance hooks redistribute the budget when instances come and go.

python
from chio_streaming.kafka import ChioConsumerGroup

group = ChioConsumerGroup(
    group_id="research-swarm",
    topics=["research-tasks"],
    sidecar_url="http://127.0.0.1:9090",

    # Group-level capability grant
    group_scope="agent:research-swarm",
    group_capabilities=[
        "events:consume:research-tasks",
        "tools:search",
        "tools:browse",
        "tools:summarize",
    ],

    # Budget shared across every consumer in the group
    group_budget={
        "max_calls":    10000,
        "max_cost_usd": 50.00,
    },

    # Per-consumer slice of the shared budget
    per_consumer_budget={
        "max_calls":    500,
        "max_cost_usd": 5.00,
    },

    on_rebalance="redistribute_budget",
)

Budget travels with partition assignment

When a rebalance moves partitions between consumers, the per-consumer budget slice moves too. A single hot partition cannot bankrupt the swarm because the group budget enforces a ceiling even if a local slice is exhausted and refilled repeatedly.

Schema Registry as a Guard Input

Schema Registry governs what the data looks like. chio governs what agents do with the data. The two compose: a chio guard can read the schema for a topic and identify sensitive fields, then require a stronger scope when those fields are present:

python
class PiiFilterGuard:
    async def evaluate(self, context):
        schema = await schema_registry.get_schema(context.topic)
        pii_fields = [f for f in schema.fields if f.has_tag("pii")]

        if pii_fields and not context.has_scope("data:pii:read"):
            return Deny(
                f"Event contains PII fields {pii_fields}, "
                f"requires data:pii:read scope"
            )
        return Allow()

Dead Letter Queue as a Security Signal

In a traditional Kafka system, the DLQ is a place for messages whose processing crashed. In a chio-governed system it is something else: a feed of messages an agent was not authorized to process. That shift turns the DLQ from an error channel into a security channel. High DLQ volume is not a bug, it is evidence that an agent is trying to do things it does not have capabilities for.

text
Traditional DLQ:
  Event -> Consumer -> Processing failed -> DLQ
  Meaning: "We tried and couldn't."

chio-governed DLQ:
  Event -> Consumer -> chio denied -> DLQ + signed denial receipt
  Meaning: "We were not authorized to process this."

The DLQ becomes a security signal:
  - High DLQ volume indicates unauthorized action attempts
  - Repeating denial patterns surface misconfigured capabilities or attacks
  - Receipt-enriched DLQ is an auditable proof of enforcement

The DLQ producer packages every denial with full chio context so that downstream analysis can attribute the denial to an identity, a scope, and the specific guard that rejected it:

python
class ChioDeadLetterProducer:
    """Route denied events to the DLQ with full chio context."""

    async def send_to_dlq(self, event, verdict):
        dlq_event = {
            "original_topic":     event.topic(),
            "original_key":       event.key(),
            "original_value":     event.value(),
            "original_timestamp": event.timestamp(),
            "chio_denial": {
                "receipt_id":       verdict.receipt_id,
                "reason":           verdict.reason,
                "scope_requested":  verdict.scope,
                "identity":         verdict.identity,
                "guards_evaluated": verdict.guards,
                "timestamp":        verdict.timestamp,
            },
        }

        await self.producer.produce(
            topic=self.dlq_topic,
            value=json.dumps(dlq_event),
            headers={
                "X-Chio-Receipt":        verdict.receipt_id,
                "X-Chio-Denial-Reason":  verdict.reason,
                "X-Chio-Original-Topic": event.topic(),
            },
        )

Load the DLQ into a data warehouse and denial patterns become queryable: group by reason, scope, and agent identity over a rolling window, and threshold alerts fire when a specific agent starts attempting actions outside its scope.


Receipt Chaining Across Choreography

Because no coordinator owns a choreography, the only global view comes from the receipts themselves. Each event carries the receipt id of the producer that wrote it, so the consumer can record its own receipt with a parent pointer. That builds a directed graph across the whole choreography. Any receipt can be traced forward to everything it triggered or backward to the original stimulus.

rendering…
Every produce/consume and every tool call gets a receipt. Each downstream receipt carries parent_receipt, building a directed graph across the whole choreography.

The CLI walks the chain in either direction. Forward traversal answers "what did this event cause to happen?" Backward traversal answers "what was the root cause of this action?"

bash
# Forward walk from the initial event
chio receipt chain rcpt_001 --direction forward
# rcpt_001 (order.placed produced by order-service)
#   -> rcpt_002 (order.placed consumed by payment-service)
#     -> rcpt_003 (charge_payment tool call)
#       -> rcpt_004 (payment.charged produced by payment-service)
#         -> rcpt_005 (payment.charged consumed by order-service)

# Backward walk from a downstream effect
chio receipt chain rcpt_005 --direction backward
# Traces back to the original order.placed event.

Other Brokers on the Roadmap

The Kafka integration is the first concrete implementation, but the consumer-side evaluation model generalizes. Kafka ships today via consumer wrapper and transactional processor. The same chio-streaming package family documents roadmap adapters for NATS (JetStream subscription middleware and KV capability cache), Pulsar (consumer interceptor), Amazon EventBridge (Lambda target handler), Google Pub/Sub (subscriber callback wrapper), and Redis Streams (group consumer wrapper). All reuse the same evaluation, receipt signing, and DLQ governance primitives.


Package Layout

text
sdks/python/chio-streaming/
  pyproject.toml             # deps: chio-sdk-python
  src/chio_streaming/
    kafka/
      consumer.py            # ChioConsumerMiddleware
      producer.py            # ChioProducerMiddleware
      transactional.py       # ChioTransactionalProcessor
      group.py               # ChioConsumerGroup (swarm)
    nats/          # roadmap: middleware, KV capability cache
    pubsub/        # roadmap: subscriber callback wrapper
    eventbridge/   # roadmap: Lambda target handler
    redis/         # roadmap: group consumer wrapper
    dlq.py                   # ChioDeadLetterProducer
    chain.py                 # Receipt chain utilities

Open Questions

  • Broker-level enforcement. This design evaluates at the consumer, not the broker. Should chio ship a Kafka interceptor plugin or NATS authorization callout that evaluates at the broker level? Pro: earlier enforcement. Con: broker coupling, latency on the hot path.
  • Compacted topics. Kafka compacted topics retain the latest value per key. If a capability is revoked after an event is compacted, should the agent still be able to consume the compacted event based on the original attestation?
  • Multi-cluster streaming. MirrorMaker and Confluent Cluster Linking replicate events across clusters. Should receipts replicate with the events, or should each cluster maintain its own receipt chain with cross-cluster federation?
  • Backpressure. If chio denies a high volume of events, the DLQ can become the bottleneck. Should the consumer apply backpressure to the source topic, or should a high denial rate trigger a consumer group circuit breaker?
  • Event replay. Consumers can reset offsets and replay events. Should chio re-evaluate capabilities on replay, since they may have changed, or honor the original evaluation from the receipt chain?

Next Steps

  • AWS Lambda · run chio as a Lambda extension alongside serverless tool servers
  • Temporal · orchestrated workflows that complement choreographed streams
  • Budgets · per-group, per-consumer spending envelopes for streaming agents