Python SDK reference.
cosmonapse Python package - verified against packages/python-sdk. If something here disagrees with the code, the code wins.
Dendrite - synapse-side connector
The Dendrite is the only thing that touches the Synapse. It hosts attached Axons, owns REGISTER / HEARTBEAT / DEREGISTER, routes inbound TASKs to the matching Axon, and publishes the Axon’s returned Signal. Every Dendrite can also orchestrate - there is no separate Cortex class in v0.2.
cosmonapse.DendriteHosts Axons and exposes every orchestration primitive. Synapse and (optionally) RegistryStore are passed in; the Dendrite never builds or closes them.
class Dendrite(LifecycleHooks): def __init__( self, *, synapse: Synapse, # REQUIRED registry_store: RegistryStore | None = None, namespace: str = "default", dendrite_id: str = "dendrite", heartbeat_s: float = 30.0, reregister_on_heartbeat: bool = True, role: str = "orchestrator", # or "worker" auto_bid: bool = True, # default bidder for hosted Axons stale_after_s: float | None = None, # liveness sweep; default 3 heartbeats ) -> None: ... # Aggregate of every attached Axon's capabilities (sorted, deduped). @property def capabilities(self) -> list[str]: ... @property def role(self) -> str: ... # "orchestrator" | "worker" # No Dendrite.connect() - build the synapse yourself, then pass it in: # synapse = await connect_synapse("cosmo://127.0.0.1:7070") # dendrite = Dendrite(synapse=synapse, ...) # ── Axon lifecycle ────────────────────────────────────────── def attach_axon(self, axon: Axon) -> None: ... async def detach_axon(self, neuron_id: str, *, reason=None) -> None: ... async def add_axon(self, axon: Axon) -> None: ... # attach while running async def start(self) -> None: ... async def stop(self, reason: str | None = None) -> None: ... async def __aenter__(self) -> "Dendrite": ... async def __aexit__(self, *exc) -> None: ... # ── Dispatch (orchestrator-role only) ─────────────────────── # Addressed (neuron=) or capability-routed (capabilities=); at least one required. async def dispatch_task( self, *, neuron: str | None = None, input: dict, trace_id: str | None = None, parent_id: str | None = None, context_ref: str | None = None, capabilities: list[str] | None = None, finalize: bool = False, # worker promotes AGENT_OUTPUT to FINAL meta: dict | None = None, ) -> Signal: ... # fire-and-forget; returns the emitted TASK # Returns a Pathway scoped to the trace. scope="all" or "terminal". async def dispatch( self, *, neuron: str | None = None, input: dict, capabilities: list[str] | None = None, scope: str = "all", finalize: bool | None = None, # default: True when scope="terminal" # ... trace_id, parent_id, context_ref, meta as above ... ) -> Pathway: ... # Sugar: dispatch, block until terminal Signal, close, return Signal. async def dispatch_and_wait( self, *, neuron=None, input, capabilities=None, timeout_s: float | None = 30.0, scope="all", finalize=None, **kw, ) -> Signal: ... # scope="terminal" waits for FINAL/ERROR only; finalize defaults True # there, so a stock worker's AGENT_OUTPUT is promoted to FINAL. # Sugar: dispatch, return the live Pathway (caller wires @pw.on(...)). async def dispatch_and_subscribe( self, *, neuron=None, input, capabilities=None, scope="all", **kw, ) -> Pathway: ... # Competitive bidding via TASK_OFFER / BID / TASK_AWARDED. async def dispatch_offer( self, *, input: dict, capabilities: list[str] | None = None, deadline_ms: int = 250, select: str = "first_bid", # lowest_cost | highest_confidence scope: str = "all", **kw, ) -> Pathway: ... # Watch a trace another peer started (no TASK emitted). async def observe_pathway(self, trace_id: str) -> Pathway: ... # ── Interactive cognition (CLARIFICATION / PERMISSION loop) ─ # Await the discrete CLARIFICATION_ANSWER / PERMISSION_DECISION # whose parent_id == request.id (op-pathway under the hood). async def await_decision(self, request: Signal, *, timeout_s=30.0) -> Signal: ... async def answer_clarification(self, request: Signal, *, answer, meta=None) -> Signal: ... # ── Engram - shared memory (RECALL / IMPRINT) ───────────── def attach_engram(self, engram: Engram) -> None: ... async def detach_engram(self, engram_id: str) -> None: ... async def recall(self, *, engram_id=None, engram_kind=None, query, **kw) -> RecallResult: ... async def imprint(self, *, engram_id=None, engram_kind=None, op, entry, **kw) -> ImprintReceipt | None: ... # Worker side: react to TASK_OFFER + emit a BID (bypasses role guard). def on_task_offer(self, fn=None, *, capability=None, trace_id=None): ... async def bid( self, offer: Signal, *, neuron: str, cost: float, eta_ms: int | None = None, confidence: float | None = None, ) -> Signal: ... # ── Cognition emitters (orchestrator-role; funnel through emit()) ── async def emit_final(self, *, trace_id, parent_id, result, meta=None) -> Signal: ... async def emit_error(self, *, trace_id, parent_id, code, message, recoverable=False, meta=None) -> Signal: ... async def emit_plan(self, *, trace_id, parent_id, steps, rationale=None, **kw) -> Signal: ... async def emit_thought_delta(self, *, trace_id, parent_id, delta, seq=None, **kw) -> Signal: ... async def emit_tool_call(self, *, trace_id, parent_id, tool, args, call_id=None, **kw) -> Signal: ... async def emit_tool_result(self, *, trace_id, parent_id, tool, result=None, error=None, **kw) -> Signal: ... async def emit_memory_append(self, *, trace_id, parent_id, key, value, **kw) -> Signal: ... async def emit_critique(self, *, trace_id, parent_id, target_event_id, issues, verdict, **kw) -> Signal: ... async def emit_escalation(self, *, trace_id, parent_id, reason, target=None, **kw) -> Signal: ... async def emit_consensus(self, *, trace_id, parent_id, members, verdict, votes=None, **kw) -> Signal: ... async def emit_context_sync(self, *, trace_id, parent_id, snapshot, version=None, **kw) -> Signal: ... async def emit(self, signal: Signal) -> None: ... # emit() enforces role guard (workers raise DendriteProtocolError) and # type guard (only SYNAPSE_TYPES accepted). bid() uses _publish to bypass. # ── Inbound handler decorators ────────────────────────────── # One per SignalType, all accepting the same optional filters: # (neuron=, capability=, trace_id=) # Lifecycle @dendrite.on_task_signal @dendrite.on_agent_output @dendrite.on_final @dendrite.on_error_signal # Routing / bidding @dendrite.on_task_offer # registering one suppresses the auto-bidder @dendrite.on_bid @dendrite.on_task_awarded @dendrite.on_task_declined # Cognition @dendrite.on_plan @dendrite.on_thought_delta @dendrite.on_tool_call @dendrite.on_tool_result @dendrite.on_memory_append @dendrite.on_critique @dendrite.on_escalation @dendrite.on_consensus @dendrite.on_context_sync # Interactive cognition (see await_decision) @dendrite.on_clarification @dendrite.on_permission @dendrite.on_clarification_answer @dendrite.on_permission_decision # Engram @dendrite.on_recall_signal # requests crossing the bus @dendrite.on_imprint_signal @dendrite.on_recalled # responses (observability) @dendrite.on_imprinted # Agent management + discovery @dendrite.on_register_signal @dendrite.on_deregister_signal @dendrite.on_heartbeat_signal @dendrite.on_discover # (on_error / on_register / on_deregister / on_heartbeat are # DEPRECATED short aliases - prefer the *_signal names) # Generic escape hatches @dendrite.on_signal(SignalType.X, neuron=..., capability=..., trace_id=...) @dendrite.on_trace(trace_id, *types) # every (or selected) type on one trace # Inherited from LifecycleHooks @dendrite.on_connect # after this Dendrite registers @dendrite.on_refresh # heartbeat / register / deregister @dendrite.on_schedule(every_s=N) # periodic background coroutine # subscribe() is a coroutine, NOT a decorator: async def subscribe(self, signal_type: SignalType, handler, *, queue_group=None) -> Subscription: ... # ── RegistryStore reads (require a registry_store) ────────── async def find_neurons(self, *, capability: str | None = None) -> list[NeuronRecord]: ... async def registry_snapshot(self, *, capability=None, include_deregistered=False) -> list[NeuronRecord]: ...
Constructor parameters
| Parameter | Type | Description |
|---|---|---|
| synapse | Synapse | Required. An already-connected synapse adapter. The Dendrite never calls connect() on it. |
| registry_store | RegistryStore | None | Optional. When supplied, the Dendrite mirrors attached Axons into the store and auto-subscribes to REGISTER / DEREGISTER / HEARTBEAT for the namespace. |
| namespace | str | Subject namespace. All published and subscribed subjects are scoped under cosmonapse.<namespace>.<TYPE>. Default "default". |
| dendrite_id | str | Identifier embedded as neuron in this Dendrite’s outbound FINAL / ERROR signals. Default "dendrite". |
| heartbeat_s | float | Interval between HEARTBEAT emissions in seconds. Default 30.0. Pass 0 to disable the loop. |
| reregister_on_heartbeat | bool | Re-emit REGISTER alongside each HEARTBEAT so late-joining consumers discover Axons without a dedicated sync. Default True. |
Axon lifecycle
Dendrite.attach_axon(axon: Axon) -> NoneRegister an Axon on this Dendrite. If the Dendrite is already started, the next start cycle emits REGISTER. Raises if neuron_id is already attached.
Dendrite.detach_axon(neuron_id: str) -> NoneStop hosting the named Axon and emit DEREGISTER.
Dendrite.start() -> NoneWire subscriptions, emit REGISTER for every attached Axon, and start the heartbeat task plus any on_schedule coroutines.
Dendrite.stop(reason=None) -> NoneCancel background tasks, emit DEREGISTER for each Axon, and tear down subscriptions. The passed-in Synapse and RegistryStore are NOT closed - the caller owns them.
async with Dendrite as d: ...Equivalent to start() on enter and stop() on exit, with exceptions propagated normally.
Orchestration primitives
Dendrite.dispatch_task(*, neuron, input, trace_id=None, parent_id=None, context_ref=None, capabilities=None, meta=None) -> SignalPublish a TASK envelope addressed to a Neuron. Auto-generates trace_id and id if omitted. This is a fire-and-publish call - it does not consult the registry, so dispatching to an unknown neuron simply produces a TASK no Axon picks up. Returns the emitted Signal so the caller can correlate.
Dendrite.emit_final(*, trace_id, parent_id, result, meta=None) -> SignalPublish a terminal FINAL envelope for a trace. Exactly one FINAL or ERROR is expected per trace; subsequent terminal envelopes for the same trace are dropped by well-behaved consumers.
Dendrite.emit_error(*, trace_id, parent_id, code, message, recoverable=False, meta=None) -> SignalPublish a terminal ERROR envelope with a short machine code and a human-readable message. recoverable=True signals to the consumer that the task may be retried or re-routed.
Dendrite.emit(signal: Signal) -> NoneLow-level escape hatch. Raises DendriteProtocolError for any Signal whose type is not in SYNAPSE_TYPES (the allow-list defined in code, not just convention).
Resilience & cancellation
Retry is a declarative policy for the request/reply shape only - the Dendrite owns the full dispatch → wait → close arc and can transparently re-dispatch. The streaming shapes (dispatch / dispatch_and_subscribe) hand the live Pathway back to the caller, so retry there would orphan their subscriptions. A new-trace retry broadcasts STOP on the abandoned trace first, so a stalled worker (and its half-finished Engram writes) can’t outlive the retry.
Dendrite.run_with_retry(*, retry: RetryStrategy, neuron=None, input, capabilities=None, timeout_s=30.0, scope='all', finalize=None, **kw) -> SignalDispatch and wait, retrying per the RetryStrategy until a non-retryable outcome or attempts are exhausted. Returns the resolved Signal (FINAL / AGENT_OUTPUT / CLARIFICATION / PERMISSION, or a final ERROR); re-raises the last exception when every attempt timed out.
Dendrite.dispatch_and_wait(..., retry: RetryStrategy | None = None) -> SignalThe request/reply sugar also accepts retry=. When supplied it delegates to the same loop as run_with_retry; when omitted it is a single dispatch + wait.
Dendrite.emit_stop(*, trace_id, rollback=False, reason=None) -> SignalBroadcast a STOP for a whole trace (orchestrator-gated). Best-effort and idempotent: STOP is fire-and-forget, so a peer that never saw it simply isn't stopped. rollback=True replays each hosted Engram's per-trace saga journal in reverse. Returns the emitted STOP Signal.
Dendrite.stop_trace(trace_id, *, rollback=False, reason=None, collect_acks=False, timeout_s=1.0) -> list[Signal]Thin wrapper over emit_stop. With collect_acks=True it opens a short-lived STOPPED subscription and returns the acks (one per quiesced Dendrite) seen within timeout_s, best effort.
from cosmonapse import RetryStrategy, default_retry_on # Declarative retry policy for the request/reply shape. @dataclass(frozen=True) class RetryStrategy: max_attempts: int = 3 # total tries, incl. the first (>= 1) timeout_s: float = 30.0 # per-attempt terminal timeout backoff: Callable[[int], float] = ... # attempt -> secs; default 0 retry_on: Callable[[object], bool] = default_retry_on new_trace: bool = True # fresh trace + STOP the abandoned one rollback_on_retry: bool = False # also roll back its Engram writes on_retry: Callable[[int, object], None] | None = None reason: str = "retry" # default_retry_on retries on a timeout, on a Pathway that closed before a # terminal, or on an ERROR flagged recoverable. A FINAL / AGENT_OUTPUT / # CLARIFICATION / PERMISSION is never retried. sig = await orch.run_with_retry( neuron="flaky-worker", input={"q": "hi"}, retry=RetryStrategy( max_attempts=5, timeout_s=10.0, backoff=lambda a: 2 ** a, # 1s, 2s, 4s, ... rollback_on_retry=True, # undo each abandoned attempt's Engram writes ), ) # Or cancel a whole workflow yourself (and roll back its Engram writes): await orch.stop_trace(trace_id, rollback=True, reason="superseded")
Inbound handlers
Every SignalType has a named on_* decorator you apply to a coroutine, and all of them accept the same optional filters: neuron=, capability=, trace_id=. on_signal(SignalType.X, ...) is the generic escape hatch behind the named sugar, on_trace(trace_id, *types) registers one handler across a whole trace, and subscribe() is a coroutine - not a decorator - returning a raw Subscription.
| Registration | Fires on |
|---|---|
| @dendrite.on_task_signal | Every TASK on the namespace. |
| @dendrite.on_agent_output | Every AGENT_OUTPUT. |
| @dendrite.on_final | Every FINAL - workflow conclusion. |
| @dendrite.on_error_signal | Every ERROR. (@dendrite.on_error is a deprecated alias.) |
| @dendrite.on_task_offer | Every TASK_OFFER. Registering one suppresses the default auto-bidder. |
| @dendrite.on_bid / on_task_awarded / on_task_declined | The bidding flow - market observability. |
| @dendrite.on_plan / on_thought_delta / on_tool_call / on_tool_result | The cognition stream. |
| @dendrite.on_memory_append / on_critique / on_escalation / on_consensus / on_context_sync | The remaining cognition / coordination types. |
| @dendrite.on_clarification | Every CLARIFICATION. Reply with answer_clarification (discrete) or respond_to_clarification (re-dispatch). |
| @dendrite.on_permission | Every PERMISSION request. Reply with grant_permission / deny_permission / respond_to_permission. |
| @dendrite.on_clarification_answer / on_permission_decision | The discrete answers - the decorator counterparts of await_decision(). |
| @dendrite.on_recall_signal / on_imprint_signal | Engram requests crossing the bus. |
| @dendrite.on_recalled / on_imprinted | Engram responses (observability; EngramClient owns correlation). |
| @dendrite.on_register_signal | Every REGISTER, including re-registers attached to HEARTBEATs. (on_register is a deprecated alias.) |
| @dendrite.on_deregister_signal / on_heartbeat_signal | DEREGISTER / HEARTBEAT. (Short aliases deprecated.) |
| @dendrite.on_discover | Every DISCOVER - answer with your hosted Axons (cosmo registry list uses this). |
| @dendrite.on_signal(SignalType.X, neuron=…, capability=…, trace_id=…) | Any SignalType - the generic form with the same filters. |
| @dendrite.on_trace(trace_id, *types) | Every (or the selected) type on one trace. |
| @dendrite.on_connect / on_refresh / on_schedule(every_s=N) | LifecycleHooks - not Signals: fired on registration, registry refresh, and a timer. |
| await dendrite.subscribe(SignalType.X, handler) | Raw subscription. Returns a Subscription you can later unsubscribe. |
RegistryStore reads
Dendrite.find_neurons(*, capability=None) -> list[NeuronRecord]Return live (non-deregistered) Neurons on the namespace, optionally filtered to those advertising the given capability. Requires a registry_store.
Dendrite.registry_snapshot(*, capability=None, include_deregistered=False) -> list[NeuronRecord]Point-in-time snapshot of the registry. Useful in on_connect / on_schedule handlers. Requires a registry_store.
Example
import asyncio from cosmonapse import Dendrite, SqliteRegistryStore, connect_synapse async def main(): synapse = await connect_synapse("cosmo://127.0.0.1:7070") dendrite = Dendrite( synapse = synapse, registry_store = SqliteRegistryStore("/tmp/worker.db"), namespace = "prod", heartbeat_s = 5.0, ) dendrite.attach_axon(axon) @dendrite.on_agent_output async def handle(sig): await dendrite.emit_final( trace_id=sig.trace_id, parent_id=sig.id, result=sig.payload["output"], ) async with dendrite: await dendrite.dispatch_task(neuron="answerer", input={"q": "hi"}) await asyncio.sleep(float("inf")) # The Dendrite never closes the synapse - you do: await synapse.close() asyncio.run(main())
Have a feature in mind?
The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.