Python SDK reference.
cosmonapse Python package - verified against packages/python-sdk. If something here disagrees with the code, the code wins.
Pathway - per-trace event handle
A Pathway is the client-side observation surface for one logical workflow, identified by its trace_id. It supports three consumption shapes on the same primitive, so the developer picks the shape that fits the workflow rather than the SDK forcing a style.
| Shape | When to use |
|---|---|
await pw.wait() | Sequential request/reply. Blocks until the first AGENT_OUTPUT / CLARIFICATION / ERROR / FINAL. |
@pw.on(SignalType.X) | Reactive trace-scoped callbacks. Useful for streams like THOUGHT_DELTA or PLAN/TOOL_CALL. |
async for sig in pw: | Streaming iteration over every Signal on the trace until close. |
Scope filter. Pathway(scope="all") (default, centralised pattern) delivers every PATHWAY_TYPES Signal on the trace. scope="terminal" (decentralised pattern) delivers only FINAL / ERROR / CLARIFICATION - intermediate orchestration is handled peer-to-peer by other Dendrites and the Cortex only wakes for events that need attention. FINAL and ERROR always reach auto-close regardless of scope.
Originator vs observer. dendrite.dispatch(...) returns a Pathway in originator role. dendrite.observe_pathway(trace_id) opens one in observer role - watch a trace another peer started, without emitting a TASK.
class Pathway: # A per-trace event handle. Open via dendrite.dispatch(...) or # dendrite.observe_pathway(trace_id). Three consumption shapes on # the same primitive - pick whichever fits the workflow. @property def trace_id(self) -> str: ... @property def role(self) -> str: ... # "originator" | "observer" @property def scope(self) -> str: ... # "all" | "terminal" @property def closed(self) -> bool: ... # Shape 1 - sequential / request-reply async def wait(self, timeout_s: float | None = None) -> Signal: ... async def wait_for(self, signal_type: SignalType, timeout_s=None) -> Signal: ... # Shape 2 - reactive callbacks (trace-scoped) def on(self, signal_type: SignalType): ... # decorator # Shape 3 - async iteration def __aiter__(self) -> "Pathway": ... async def __anext__(self) -> Signal: ... # Lifecycle - auto-closes on FINAL / ERROR; close() is idempotent. async def close(self) -> None: ... async def __aenter__(self) -> "Pathway": ... async def __aexit__(self, *exc) -> None: ...
Example
from cosmonapse import Dendrite, SignalType, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") orch = Dendrite(synapse=synapse, namespace="prod") async with orch: # Shape 1 - sequential sig = await orch.dispatch_and_wait( capabilities=["summarize"], input={"text": "..."}, timeout_s=5.0, ) # Shape 2 - reactive pw = await orch.dispatch_and_subscribe( capabilities=["plan"], input={"goal": "..."}, ) @pw.on(SignalType.PLAN) async def on_plan(s): print(s.payload["steps"]) # Shape 3 - streaming async with await orch.dispatch(neuron="agent", input={}) as pw: async for s in pw: if s.type is SignalType.AGENT_OUTPUT: break
Have a feature in mind?
The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.