SDK · 06

Pathway - per-trace event handle

A Pathway is the client-side observation surface for one logical workflow, identified by its trace_id. It supports three consumption shapes on the same primitive, so the developer picks the shape that fits the workflow rather than the SDK forcing a style.

ShapeWhen to use
await pw.wait()Sequential request/reply. Blocks until the first AGENT_OUTPUT / CLARIFICATION / ERROR / FINAL.
@pw.on(SignalType.X)Reactive trace-scoped callbacks. Useful for streams like THOUGHT_DELTA or PLAN/TOOL_CALL.
async for sig in pw:Streaming iteration over every Signal on the trace until close.

Scope filter. Pathway(scope="all") (default, centralised pattern) delivers every PATHWAY_TYPES Signal on the trace. scope="terminal" (decentralised pattern) delivers only FINAL / ERROR / CLARIFICATION - intermediate orchestration is handled peer-to-peer by other Dendrites and the Cortex only wakes for events that need attention. FINAL and ERROR always reach auto-close regardless of scope.

Originator vs observer. dendrite.dispatch(...) returns a Pathway in originator role. dendrite.observe_pathway(trace_id) opens one in observer role - watch a trace another peer started, without emitting a TASK.

pathway.pyi
class Pathway:
    # A per-trace event handle. Open via dendrite.dispatch(...) or
    # dendrite.observe_pathway(trace_id). Three consumption shapes on
    # the same primitive - pick whichever fits the workflow.

    @property
    def trace_id(self) -> str: ...
    @property
    def role(self) -> str: ...        # "originator" | "observer"
    @property
    def scope(self) -> str: ...       # "all" | "terminal"
    @property
    def closed(self) -> bool: ...

    # Shape 1 - sequential / request-reply
    async def wait(self, timeout_s: float | None = None) -> Signal: ...
    async def wait_for(self, signal_type: SignalType, timeout_s=None) -> Signal: ...

    # Shape 2 - reactive callbacks (trace-scoped)
    def on(self, signal_type: SignalType): ...    # decorator

    # Shape 3 - async iteration
    def __aiter__(self) -> "Pathway": ...
    async def __anext__(self) -> Signal: ...

    # Lifecycle - auto-closes on FINAL / ERROR; close() is idempotent.
    async def close(self) -> None: ...
    async def __aenter__(self) -> "Pathway": ...
    async def __aexit__(self, *exc) -> None: ...

Example

three_shapes.py
from cosmonapse import Dendrite, SignalType, connect_synapse

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    orch = Dendrite(synapse=synapse, namespace="prod")

    async with orch:
        # Shape 1 - sequential
        sig = await orch.dispatch_and_wait(
            capabilities=["summarize"], input={"text": "..."}, timeout_s=5.0,
        )

        # Shape 2 - reactive
        pw = await orch.dispatch_and_subscribe(
            capabilities=["plan"], input={"goal": "..."},
        )
        @pw.on(SignalType.PLAN)
        async def on_plan(s): print(s.payload["steps"])

        # Shape 3 - streaming
        async with await orch.dispatch(neuron="agent", input={}) as pw:
            async for s in pw:
                if s.type is SignalType.AGENT_OUTPUT: break

Have a feature in mind?

The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.