SDK · 05

Dendrite - synapse-side connector

The Dendrite is the only thing that touches the Synapse. It hosts attached Axons, owns REGISTER / HEARTBEAT / DEREGISTER, routes inbound TASKs to the matching Axon, and publishes the Axon’s returned Signal. Every Dendrite can also orchestrate - there is no separate Cortex class in v0.2.

classcosmonapse.Dendrite

Hosts Axons and exposes every orchestration primitive. Synapse and (optionally) RegistryStore are passed in; the Dendrite never builds or closes them.

dendrite.pyi
class Dendrite(LifecycleHooks):
    def __init__(
        self,
        *,
        synapse:         Synapse,                  # REQUIRED
        registry_store:  RegistryStore | None = None,
        namespace:       str  = "default",
        dendrite_id:     str  = "dendrite",
        heartbeat_s:             float = 30.0,
        reregister_on_heartbeat: bool  = True,
        role:            str   = "orchestrator",  # or "worker"
        auto_bid:        bool  = True,   # default bidder for hosted Axons
        stale_after_s:   float | None = None,  # liveness sweep; default 3 heartbeats
    ) -> None: ...

    # Aggregate of every attached Axon's capabilities (sorted, deduped).
    @property
    def capabilities(self) -> list[str]: ...

    @property
    def role(self) -> str: ...           # "orchestrator" | "worker"

    # No Dendrite.connect()  -  build the synapse yourself, then pass it in:
    #   synapse = await connect_synapse("cosmo://127.0.0.1:7070")
    #   dendrite = Dendrite(synapse=synapse, ...)

    # ── Axon lifecycle ──────────────────────────────────────────
    def    attach_axon(self, axon: Axon) -> None: ...
    async def detach_axon(self, neuron_id: str, *, reason=None) -> None: ...
    async def add_axon(self, axon: Axon) -> None: ...   # attach while running
    async def start(self) -> None: ...
    async def stop(self, reason: str | None = None) -> None: ...
    async def __aenter__(self) -> "Dendrite": ...
    async def __aexit__(self, *exc) -> None: ...

    # ── Dispatch (orchestrator-role only) ───────────────────────
    # Addressed (neuron=) or capability-routed (capabilities=); at least one required.
    async def dispatch_task(
        self, *,
        neuron:       str | None = None,
        input:        dict,
        trace_id:     str  | None = None,
        parent_id:    str  | None = None,
        context_ref:  str  | None = None,
        capabilities: list[str] | None = None,
        finalize:     bool = False,  # worker promotes AGENT_OUTPUT to FINAL
        meta:         dict | None = None,
    ) -> Signal: ...        # fire-and-forget; returns the emitted TASK

    # Returns a Pathway scoped to the trace. scope="all" or "terminal".
    async def dispatch(
        self, *,
        neuron:       str | None = None,
        input:        dict,
        capabilities: list[str] | None = None,
        scope:        str  = "all",
        finalize:     bool | None = None,  # default: True when scope="terminal"
        # ... trace_id, parent_id, context_ref, meta as above ...
    ) -> Pathway: ...

    # Sugar: dispatch, block until terminal Signal, close, return Signal.
    async def dispatch_and_wait(
        self, *, neuron=None, input, capabilities=None,
        timeout_s: float | None = 30.0, scope="all", finalize=None, **kw,
    ) -> Signal: ...
    # scope="terminal" waits for FINAL/ERROR only; finalize defaults True
    # there, so a stock worker's AGENT_OUTPUT is promoted to FINAL.

    # Sugar: dispatch, return the live Pathway (caller wires @pw.on(...)).
    async def dispatch_and_subscribe(
        self, *, neuron=None, input, capabilities=None, scope="all", **kw,
    ) -> Pathway: ...

    # Competitive bidding via TASK_OFFER / BID / TASK_AWARDED.
    async def dispatch_offer(
        self, *,
        input:        dict,
        capabilities: list[str] | None = None,
        deadline_ms:  int  = 250,
        select:       str  = "first_bid",  # lowest_cost | highest_confidence
        scope:        str  = "all",
        **kw,
    ) -> Pathway: ...

    # Watch a trace another peer started (no TASK emitted).
    async def observe_pathway(self, trace_id: str) -> Pathway: ...

    # ── Interactive cognition (CLARIFICATION / PERMISSION loop) ─
    # Await the discrete CLARIFICATION_ANSWER / PERMISSION_DECISION
    # whose parent_id == request.id (op-pathway under the hood).
    async def await_decision(self, request: Signal, *, timeout_s=30.0) -> Signal: ...
    async def answer_clarification(self, request: Signal, *, answer, meta=None) -> Signal: ...

    # ── Engram  -  shared memory (RECALL / IMPRINT) ─────────────
    def       attach_engram(self, engram: Engram) -> None: ...
    async def detach_engram(self, engram_id: str) -> None: ...
    async def recall(self, *, engram_id=None, engram_kind=None, query, **kw) -> RecallResult: ...
    async def imprint(self, *, engram_id=None, engram_kind=None, op, entry, **kw) -> ImprintReceipt | None: ...

    # Worker side: react to TASK_OFFER + emit a BID (bypasses role guard).
    def       on_task_offer(self, fn=None, *, capability=None, trace_id=None): ...
    async def bid(
        self, offer: Signal, *,
        neuron: str, cost: float,
        eta_ms: int | None = None,
        confidence: float | None = None,
    ) -> Signal: ...

    # ── Cognition emitters (orchestrator-role; funnel through emit()) ──
    async def emit_final(self, *, trace_id, parent_id, result, meta=None) -> Signal: ...
    async def emit_error(self, *, trace_id, parent_id, code, message, recoverable=False, meta=None) -> Signal: ...
    async def emit_plan(self, *, trace_id, parent_id, steps, rationale=None, **kw) -> Signal: ...
    async def emit_thought_delta(self, *, trace_id, parent_id, delta, seq=None, **kw) -> Signal: ...
    async def emit_tool_call(self, *, trace_id, parent_id, tool, args, call_id=None, **kw) -> Signal: ...
    async def emit_tool_result(self, *, trace_id, parent_id, tool, result=None, error=None, **kw) -> Signal: ...
    async def emit_memory_append(self, *, trace_id, parent_id, key, value, **kw) -> Signal: ...
    async def emit_critique(self, *, trace_id, parent_id, target_event_id, issues, verdict, **kw) -> Signal: ...
    async def emit_escalation(self, *, trace_id, parent_id, reason, target=None, **kw) -> Signal: ...
    async def emit_consensus(self, *, trace_id, parent_id, members, verdict, votes=None, **kw) -> Signal: ...
    async def emit_context_sync(self, *, trace_id, parent_id, snapshot, version=None, **kw) -> Signal: ...

    async def emit(self, signal: Signal) -> None: ...
    # emit() enforces role guard (workers raise DendriteProtocolError) and
    # type guard (only SYNAPSE_TYPES accepted). bid() uses _publish to bypass.

    # ── Inbound handler decorators ──────────────────────────────
    # One per SignalType, all accepting the same optional filters:
    #   (neuron=, capability=, trace_id=)

    # Lifecycle
    @dendrite.on_task_signal
    @dendrite.on_agent_output
    @dendrite.on_final
    @dendrite.on_error_signal

    # Routing / bidding
    @dendrite.on_task_offer                     # registering one suppresses the auto-bidder
    @dendrite.on_bid
    @dendrite.on_task_awarded
    @dendrite.on_task_declined

    # Cognition
    @dendrite.on_plan
    @dendrite.on_thought_delta
    @dendrite.on_tool_call
    @dendrite.on_tool_result
    @dendrite.on_memory_append
    @dendrite.on_critique
    @dendrite.on_escalation
    @dendrite.on_consensus
    @dendrite.on_context_sync

    # Interactive cognition (see await_decision)
    @dendrite.on_clarification
    @dendrite.on_permission
    @dendrite.on_clarification_answer
    @dendrite.on_permission_decision

    # Engram
    @dendrite.on_recall_signal                  # requests crossing the bus
    @dendrite.on_imprint_signal
    @dendrite.on_recalled                       # responses (observability)
    @dendrite.on_imprinted

    # Agent management + discovery
    @dendrite.on_register_signal
    @dendrite.on_deregister_signal
    @dendrite.on_heartbeat_signal
    @dendrite.on_discover
    # (on_error / on_register / on_deregister / on_heartbeat are
    #  DEPRECATED short aliases  -  prefer the *_signal names)

    # Generic escape hatches
    @dendrite.on_signal(SignalType.X, neuron=..., capability=..., trace_id=...)
    @dendrite.on_trace(trace_id, *types)   # every (or selected) type on one trace

    # Inherited from LifecycleHooks
    @dendrite.on_connect                        # after this Dendrite registers
    @dendrite.on_refresh                        # heartbeat / register / deregister
    @dendrite.on_schedule(every_s=N)    # periodic background coroutine

    # subscribe() is a coroutine, NOT a decorator:
    async def subscribe(self, signal_type: SignalType, handler, *, queue_group=None) -> Subscription: ...

    # ── RegistryStore reads (require a registry_store) ──────────
    async def find_neurons(self, *, capability: str | None = None) -> list[NeuronRecord]: ...
    async def registry_snapshot(self, *, capability=None, include_deregistered=False) -> list[NeuronRecord]: ...

Constructor parameters

ParameterTypeDescription
synapseSynapseRequired. An already-connected synapse adapter. The Dendrite never calls connect() on it.
registry_storeRegistryStore | NoneOptional. When supplied, the Dendrite mirrors attached Axons into the store and auto-subscribes to REGISTER / DEREGISTER / HEARTBEAT for the namespace.
namespacestrSubject namespace. All published and subscribed subjects are scoped under cosmonapse.<namespace>.<TYPE>. Default "default".
dendrite_idstrIdentifier embedded as neuron in this Dendrite’s outbound FINAL / ERROR signals. Default "dendrite".
heartbeat_sfloatInterval between HEARTBEAT emissions in seconds. Default 30.0. Pass 0 to disable the loop.
reregister_on_heartbeatboolRe-emit REGISTER alongside each HEARTBEAT so late-joining consumers discover Axons without a dedicated sync. Default True.

Axon lifecycle

methodDendrite.attach_axon(axon: Axon) -> None

Register an Axon on this Dendrite. If the Dendrite is already started, the next start cycle emits REGISTER. Raises if neuron_id is already attached.

methodDendrite.detach_axon(neuron_id: str) -> None

Stop hosting the named Axon and emit DEREGISTER.

async methodDendrite.start() -> None

Wire subscriptions, emit REGISTER for every attached Axon, and start the heartbeat task plus any on_schedule coroutines.

async methodDendrite.stop(reason=None) -> None

Cancel background tasks, emit DEREGISTER for each Axon, and tear down subscriptions. The passed-in Synapse and RegistryStore are NOT closed - the caller owns them.

async context managerasync with Dendrite as d: ...

Equivalent to start() on enter and stop() on exit, with exceptions propagated normally.

Orchestration primitives

async methodDendrite.dispatch_task(*, neuron, input, trace_id=None, parent_id=None, context_ref=None, capabilities=None, meta=None) -> Signal

Publish a TASK envelope addressed to a Neuron. Auto-generates trace_id and id if omitted. This is a fire-and-publish call - it does not consult the registry, so dispatching to an unknown neuron simply produces a TASK no Axon picks up. Returns the emitted Signal so the caller can correlate.

async methodDendrite.emit_final(*, trace_id, parent_id, result, meta=None) -> Signal

Publish a terminal FINAL envelope for a trace. Exactly one FINAL or ERROR is expected per trace; subsequent terminal envelopes for the same trace are dropped by well-behaved consumers.

async methodDendrite.emit_error(*, trace_id, parent_id, code, message, recoverable=False, meta=None) -> Signal

Publish a terminal ERROR envelope with a short machine code and a human-readable message. recoverable=True signals to the consumer that the task may be retried or re-routed.

async methodDendrite.emit(signal: Signal) -> None

Low-level escape hatch. Raises DendriteProtocolError for any Signal whose type is not in SYNAPSE_TYPES (the allow-list defined in code, not just convention).

Resilience & cancellation

Retry is a declarative policy for the request/reply shape only - the Dendrite owns the full dispatch → wait → close arc and can transparently re-dispatch. The streaming shapes (dispatch / dispatch_and_subscribe) hand the live Pathway back to the caller, so retry there would orphan their subscriptions. A new-trace retry broadcasts STOP on the abandoned trace first, so a stalled worker (and its half-finished Engram writes) can’t outlive the retry.

async methodDendrite.run_with_retry(*, retry: RetryStrategy, neuron=None, input, capabilities=None, timeout_s=30.0, scope='all', finalize=None, **kw) -> Signal

Dispatch and wait, retrying per the RetryStrategy until a non-retryable outcome or attempts are exhausted. Returns the resolved Signal (FINAL / AGENT_OUTPUT / CLARIFICATION / PERMISSION, or a final ERROR); re-raises the last exception when every attempt timed out.

async methodDendrite.dispatch_and_wait(..., retry: RetryStrategy | None = None) -> Signal

The request/reply sugar also accepts retry=. When supplied it delegates to the same loop as run_with_retry; when omitted it is a single dispatch + wait.

async methodDendrite.emit_stop(*, trace_id, rollback=False, reason=None) -> Signal

Broadcast a STOP for a whole trace (orchestrator-gated). Best-effort and idempotent: STOP is fire-and-forget, so a peer that never saw it simply isn't stopped. rollback=True replays each hosted Engram's per-trace saga journal in reverse. Returns the emitted STOP Signal.

async methodDendrite.stop_trace(trace_id, *, rollback=False, reason=None, collect_acks=False, timeout_s=1.0) -> list[Signal]

Thin wrapper over emit_stop. With collect_acks=True it opens a short-lived STOPPED subscription and returns the acks (one per quiesced Dendrite) seen within timeout_s, best effort.

retry.py
from cosmonapse import RetryStrategy, default_retry_on

# Declarative retry policy for the request/reply shape.
@dataclass(frozen=True)
class RetryStrategy:
    max_attempts:      int   = 3     # total tries, incl. the first (>= 1)
    timeout_s:         float = 30.0  # per-attempt terminal timeout
    backoff:           Callable[[int], float] = ...   # attempt -> secs; default 0
    retry_on:          Callable[[object], bool] = default_retry_on
    new_trace:         bool  = True   # fresh trace + STOP the abandoned one
    rollback_on_retry: bool  = False  # also roll back its Engram writes
    on_retry:          Callable[[int, object], None] | None = None
    reason:            str   = "retry"

# default_retry_on retries on a timeout, on a Pathway that closed before a
# terminal, or on an ERROR flagged recoverable. A FINAL / AGENT_OUTPUT /
# CLARIFICATION / PERMISSION is never retried.

sig = await orch.run_with_retry(
    neuron="flaky-worker", input={"q": "hi"},
    retry=RetryStrategy(
        max_attempts=5, timeout_s=10.0,
        backoff=lambda a: 2 ** a,   # 1s, 2s, 4s, ...
        rollback_on_retry=True,        # undo each abandoned attempt's Engram writes
    ),
)

# Or cancel a whole workflow yourself (and roll back its Engram writes):
await orch.stop_trace(trace_id, rollback=True, reason="superseded")

Inbound handlers

Every SignalType has a named on_* decorator you apply to a coroutine, and all of them accept the same optional filters: neuron=, capability=, trace_id=. on_signal(SignalType.X, ...) is the generic escape hatch behind the named sugar, on_trace(trace_id, *types) registers one handler across a whole trace, and subscribe() is a coroutine - not a decorator - returning a raw Subscription.

RegistrationFires on
@dendrite.on_task_signalEvery TASK on the namespace.
@dendrite.on_agent_outputEvery AGENT_OUTPUT.
@dendrite.on_finalEvery FINAL - workflow conclusion.
@dendrite.on_error_signalEvery ERROR. (@dendrite.on_error is a deprecated alias.)
@dendrite.on_task_offerEvery TASK_OFFER. Registering one suppresses the default auto-bidder.
@dendrite.on_bid / on_task_awarded / on_task_declinedThe bidding flow - market observability.
@dendrite.on_plan / on_thought_delta / on_tool_call / on_tool_resultThe cognition stream.
@dendrite.on_memory_append / on_critique / on_escalation / on_consensus / on_context_syncThe remaining cognition / coordination types.
@dendrite.on_clarificationEvery CLARIFICATION. Reply with answer_clarification (discrete) or respond_to_clarification (re-dispatch).
@dendrite.on_permissionEvery PERMISSION request. Reply with grant_permission / deny_permission / respond_to_permission.
@dendrite.on_clarification_answer / on_permission_decisionThe discrete answers - the decorator counterparts of await_decision().
@dendrite.on_recall_signal / on_imprint_signalEngram requests crossing the bus.
@dendrite.on_recalled / on_imprintedEngram responses (observability; EngramClient owns correlation).
@dendrite.on_register_signalEvery REGISTER, including re-registers attached to HEARTBEATs. (on_register is a deprecated alias.)
@dendrite.on_deregister_signal / on_heartbeat_signalDEREGISTER / HEARTBEAT. (Short aliases deprecated.)
@dendrite.on_discoverEvery DISCOVER - answer with your hosted Axons (cosmo registry list uses this).
@dendrite.on_signal(SignalType.X, neuron=…, capability=…, trace_id=…)Any SignalType - the generic form with the same filters.
@dendrite.on_trace(trace_id, *types)Every (or the selected) type on one trace.
@dendrite.on_connect / on_refresh / on_schedule(every_s=N)LifecycleHooks - not Signals: fired on registration, registry refresh, and a timer.
await dendrite.subscribe(SignalType.X, handler)Raw subscription. Returns a Subscription you can later unsubscribe.

RegistryStore reads

async methodDendrite.find_neurons(*, capability=None) -> list[NeuronRecord]

Return live (non-deregistered) Neurons on the namespace, optionally filtered to those advertising the given capability. Requires a registry_store.

async methodDendrite.registry_snapshot(*, capability=None, include_deregistered=False) -> list[NeuronRecord]

Point-in-time snapshot of the registry. Useful in on_connect / on_schedule handlers. Requires a registry_store.

Example

worker.py
import asyncio
from cosmonapse import Dendrite, SqliteRegistryStore, connect_synapse

async def main():
    synapse = await connect_synapse("cosmo://127.0.0.1:7070")

    dendrite = Dendrite(
        synapse        = synapse,
        registry_store = SqliteRegistryStore("/tmp/worker.db"),
        namespace      = "prod",
        heartbeat_s    = 5.0,
    )
    dendrite.attach_axon(axon)

    @dendrite.on_agent_output
    async def handle(sig):
        await dendrite.emit_final(
            trace_id=sig.trace_id,
            parent_id=sig.id,
            result=sig.payload["output"],
        )

    async with dendrite:
        await dendrite.dispatch_task(neuron="answerer", input={"q": "hi"})
        await asyncio.sleep(float("inf"))
    # The Dendrite never closes the synapse  -  you do:
    await synapse.close()

asyncio.run(main())

Have a feature in mind?

The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.