Kafka
Orchestrators like Temporal and Airflow have a coordinator to wrap. Choreography does not. Agents that react autonomously to Kafka topics have no single point where policy can be enforced, which is exactly what makes event-driven agent systems structurally hard to govern. chio plugs into the consumer side, evaluates each event before the agent acts on it, and writes receipts into Kafka transactions so the offset commit and the attestation land atomically. The integration ships today as the chio-streaming Python package wrapping confluent-kafka-python, with adapters for NATS, Pulsar, EventBridge, Pub/Sub, and Redis Streams on the documented roadmap.
Why Kafka Through chio
The Kafka consumer loop is the agent's trigger loop. An event arrives on a topic, the agent decides what to do, it calls tools in response, and it emits result events downstream. Every link in that chain is a capability boundary: the agent must be authorized to consume the inbound topic, authorized to call each tool, and authorized to produce to each outbound topic. chio evaluates all three.
The alternative is no governance at all. Kafka itself offers ACLs for topic read and write, but ACLs do not model scope, budget, guard pipelines, or signed attestation. They answer whether a principal can read a topic, not whether this specific message, with this specific content, in this specific choreography, is permitted to drive a tool call right now.
| Event streaming alone | Event streaming + chio |
|---|---|
| Agents consume freely once they have ACL read | Scoped capabilities per topic and per content class |
| No audit of tools an agent invokes in response | Signed receipts on every tool call triggered by an event |
| Schema Registry governs data shape | chio governs what agents do with the data |
| Dead letter means processing failed | Dead letter means processing was not authorized |
| Exactly-once avoids duplicate processing | Exactly-once plus receipt commit: attested processing |
Consumer-Side Enforcement
chio does not touch the broker. The broker stays a dumb pipe, which is important for compatibility with managed services like MSK and Confluent Cloud. Governance lives inside the consumer process, next to the agent loop, and every poll goes through the kernel before it is handed to application code:
Two capability boundaries exist per event. First, consumption: is this agent authorized to see this topic and this message? Second, tool invocation: for each tool the agent calls in response, is the call permitted? Production is a third boundary when the agent writes result events back out. Each of the three produces a distinct receipt, and the receipts link together so the full chain can be reconstructed.
Transactional Receipt Commit
Kafka's exactly-once semantics provide the machinery chio needs to land an offset commit and a receipt commit together. Either both succeed or both roll back. If the agent crashes between the tool call and the commit, the event will be redelivered and the receipt for the aborted attempt is flagged as rolled back, so auditors can distinguish a real invocation from a transient failure.
Receipts are external to Kafka state
Consumer Middleware
The lowest-friction path is the drop-in consumer wrapper. It preserves the confluent-kafka-python API you already use, adds capability evaluation on poll, and routes denied messages to a configurable DLQ topic:
from confluent_kafka import Consumer
from chio_streaming import ChioConsumerMiddleware
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "research-agents",
"enable.auto.commit": False,
})
chio_consumer = ChioConsumerMiddleware(
consumer=consumer,
sidecar_url="http://127.0.0.1:9090",
scope_map={
"research-tasks": "events:consume:research-tasks",
"urgent-tasks": "events:consume:urgent-tasks",
},
identity="research-agent-group",
dlq_topic="chio-denied-events",
)
while True:
event = chio_consumer.poll(timeout=1.0)
if event is None:
continue
# If we reach here, chio authorized consumption and
# event.chio_receipt_id is populated on the event object.
process(event)
# Tool calls inside process() are separately evaluated
# via the standard chio SDK (chio-sdk-python / chio-fastapi / etc).
chio_consumer.commit(event)The wrapper subscribes the dictionary keys in scope_map and installs a per-topic evaluation hook. When a message arrives, the middleware builds an evaluation request with the configured scope and the consumer group as identity, then either yields the message or publishes a structured denial event to dlq_topic.
Producer Middleware
Outbound events carry receipts so that downstream consumers can attribute the event to a specific, attested action:
from confluent_kafka import Producer
from chio_streaming import ChioProducerMiddleware
producer = Producer({"bootstrap.servers": "localhost:9092"})
chio_producer = ChioProducerMiddleware(
producer=producer,
sidecar_url="http://127.0.0.1:9090",
scope_map={
"order-events": "events:produce:order-events",
"notifications": "events:produce:notifications",
},
)
# Evaluated on produce, receipt auto-attached as headers:
# X-Chio-Receipt: rcpt_abc123
# X-Chio-Scope: events:produce:order-events
chio_producer.produce(
topic="order-events",
value=json.dumps({"order_id": "123", "status": "confirmed"}),
)Transactional Consumer-Producer
For the end-to-end atomic path, use the ChioTransactionalProcessor. It manages a Kafka transaction across consume, tool call, and produce, and ensures the receipt bundle is committed or aborted with the Kafka offsets:
from chio_streaming.kafka import ChioTransactionalProcessor
processor = ChioTransactionalProcessor(
bootstrap_servers="localhost:9092",
group_id="order-agents",
consume_topics=["orders"],
sidecar_url="http://127.0.0.1:9090",
consume_scope="events:consume:orders",
produce_scope_map={
"order-events": "events:produce:order-events",
"payment-requests": "events:produce:payment-requests",
},
)
async def handle_order(event, ctx):
"""Runs inside a Kafka transaction."""
order = json.loads(event.value())
# Separately evaluated tool call
inventory = await ctx.chio.invoke(
tool="check-inventory",
scope="tools:inventory:read",
arguments={"sku": order["sku"]},
)
if inventory["available"]:
# Produces inside the same transaction
await ctx.produce(
topic="payment-requests",
value=json.dumps({
"order_id": order["id"],
"amount": order["total"],
}),
)
# Transaction commits: offset + produced messages + receipts
# All atomic, either all succeed or none do.
processor.register("orders", handle_order)
processor.run()Consumer Group as Agent Swarm
A Kafka consumer group is, structurally, an agent swarm. Every consumer instance in the group is an autonomous worker reading from the same logical task stream. chio treats the group as a first-class governance unit: a single capability grant scopes the whole swarm, and a shared budget is distributed across its members. Rebalance hooks redistribute the budget when instances come and go.
from chio_streaming.kafka import ChioConsumerGroup
group = ChioConsumerGroup(
group_id="research-swarm",
topics=["research-tasks"],
sidecar_url="http://127.0.0.1:9090",
# Group-level capability grant
group_scope="agent:research-swarm",
group_capabilities=[
"events:consume:research-tasks",
"tools:search",
"tools:browse",
"tools:summarize",
],
# Budget shared across every consumer in the group
group_budget={
"max_calls": 10000,
"max_cost_usd": 50.00,
},
# Per-consumer slice of the shared budget
per_consumer_budget={
"max_calls": 500,
"max_cost_usd": 5.00,
},
on_rebalance="redistribute_budget",
)Budget travels with partition assignment
Schema Registry as a Guard Input
Schema Registry governs what the data looks like. chio governs what agents do with the data. The two compose: a chio guard can read the schema for a topic and identify sensitive fields, then require a stronger scope when those fields are present:
class PiiFilterGuard:
async def evaluate(self, context):
schema = await schema_registry.get_schema(context.topic)
pii_fields = [f for f in schema.fields if f.has_tag("pii")]
if pii_fields and not context.has_scope("data:pii:read"):
return Deny(
f"Event contains PII fields {pii_fields}, "
f"requires data:pii:read scope"
)
return Allow()Dead Letter Queue as a Security Signal
In a traditional Kafka system, the DLQ is a place for messages whose processing crashed. In a chio-governed system it is something else: a feed of messages an agent was not authorized to process. That shift turns the DLQ from an error channel into a security channel. High DLQ volume is not a bug, it is evidence that an agent is trying to do things it does not have capabilities for.
Traditional DLQ:
Event -> Consumer -> Processing failed -> DLQ
Meaning: "We tried and couldn't."
chio-governed DLQ:
Event -> Consumer -> chio denied -> DLQ + signed denial receipt
Meaning: "We were not authorized to process this."
The DLQ becomes a security signal:
- High DLQ volume indicates unauthorized action attempts
- Repeating denial patterns surface misconfigured capabilities or attacks
- Receipt-enriched DLQ is an auditable proof of enforcementThe DLQ producer packages every denial with full chio context so that downstream analysis can attribute the denial to an identity, a scope, and the specific guard that rejected it:
class ChioDeadLetterProducer:
"""Route denied events to the DLQ with full chio context."""
async def send_to_dlq(self, event, verdict):
dlq_event = {
"original_topic": event.topic(),
"original_key": event.key(),
"original_value": event.value(),
"original_timestamp": event.timestamp(),
"chio_denial": {
"receipt_id": verdict.receipt_id,
"reason": verdict.reason,
"scope_requested": verdict.scope,
"identity": verdict.identity,
"guards_evaluated": verdict.guards,
"timestamp": verdict.timestamp,
},
}
await self.producer.produce(
topic=self.dlq_topic,
value=json.dumps(dlq_event),
headers={
"X-Chio-Receipt": verdict.receipt_id,
"X-Chio-Denial-Reason": verdict.reason,
"X-Chio-Original-Topic": event.topic(),
},
)Load the DLQ into a data warehouse and denial patterns become queryable: group by reason, scope, and agent identity over a rolling window, and threshold alerts fire when a specific agent starts attempting actions outside its scope.
Receipt Chaining Across Choreography
Because no coordinator owns a choreography, the only global view comes from the receipts themselves. Each event carries the receipt id of the producer that wrote it, so the consumer can record its own receipt with a parent pointer. That builds a directed graph across the whole choreography. Any receipt can be traced forward to everything it triggered or backward to the original stimulus.
The CLI walks the chain in either direction. Forward traversal answers "what did this event cause to happen?" Backward traversal answers "what was the root cause of this action?"
# Forward walk from the initial event
chio receipt chain rcpt_001 --direction forward
# rcpt_001 (order.placed produced by order-service)
# -> rcpt_002 (order.placed consumed by payment-service)
# -> rcpt_003 (charge_payment tool call)
# -> rcpt_004 (payment.charged produced by payment-service)
# -> rcpt_005 (payment.charged consumed by order-service)
# Backward walk from a downstream effect
chio receipt chain rcpt_005 --direction backward
# Traces back to the original order.placed event.Other Brokers on the Roadmap
The Kafka integration is the first concrete implementation, but the consumer-side evaluation model generalizes. Kafka ships today via consumer wrapper and transactional processor. The same chio-streaming package family documents roadmap adapters for NATS (JetStream subscription middleware and KV capability cache), Pulsar (consumer interceptor), Amazon EventBridge (Lambda target handler), Google Pub/Sub (subscriber callback wrapper), and Redis Streams (group consumer wrapper). All reuse the same evaluation, receipt signing, and DLQ governance primitives.
Package Layout
sdks/python/chio-streaming/
pyproject.toml # deps: chio-sdk-python
src/chio_streaming/
kafka/
consumer.py # ChioConsumerMiddleware
producer.py # ChioProducerMiddleware
transactional.py # ChioTransactionalProcessor
group.py # ChioConsumerGroup (swarm)
nats/ # roadmap: middleware, KV capability cache
pubsub/ # roadmap: subscriber callback wrapper
eventbridge/ # roadmap: Lambda target handler
redis/ # roadmap: group consumer wrapper
dlq.py # ChioDeadLetterProducer
chain.py # Receipt chain utilitiesOpen Questions
- Broker-level enforcement. This design evaluates at the consumer, not the broker. Should chio ship a Kafka interceptor plugin or NATS authorization callout that evaluates at the broker level? Pro: earlier enforcement. Con: broker coupling, latency on the hot path.
- Compacted topics. Kafka compacted topics retain the latest value per key. If a capability is revoked after an event is compacted, should the agent still be able to consume the compacted event based on the original attestation?
- Multi-cluster streaming. MirrorMaker and Confluent Cluster Linking replicate events across clusters. Should receipts replicate with the events, or should each cluster maintain its own receipt chain with cross-cluster federation?
- Backpressure. If chio denies a high volume of events, the DLQ can become the bottleneck. Should the consumer apply backpressure to the source topic, or should a high denial rate trigger a consumer group circuit breaker?
- Event replay. Consumers can reset offsets and replay events. Should chio re-evaluate capabilities on replay, since they may have changed, or honor the original evaluation from the receipt chain?
Next Steps
- AWS Lambda · run chio as a Lambda extension alongside serverless tool servers
- Temporal · orchestrated workflows that complement choreographed streams
- Budgets · per-group, per-consumer spending envelopes for streaming agents