TS · 05

Dendrite - synapse-side connector

The Dendrite is the only component that touches the Synapse. It hosts attached Axons, owns REGISTER / HEARTBEAT / DEREGISTER, routes inbound TASKs, and exposes every orchestration primitive. Unlike Python, handler registration is plain method calls - there are no decorators in the TS port.

classDendrite

Hosts Axons and orchestrates. Synapse and (optionally) RegistryStore are passed in via the options object; the Dendrite never builds or closes them.

dendrite.ts
interface DendriteOptions {
  synapse:                Synapse;        // REQUIRED
  registryStore?:         RegistryStore;
  namespace?:             string;         // default "default"
  dendriteId?:            string;         // default "dendrite"
  heartbeatMs?:           number;         // default 30_000; 0 disables
  reregisterOnHeartbeat?: boolean;        // default true
  role?:                  DendriteRole;   // "orchestrator" (default) | "worker"
  autoBid?:               boolean;        // default true  -  default bidder for hosted Axons
  staleAfterMs?:          number;         // liveness sweep; default 3 heartbeat intervals
}

class Dendrite {
  constructor(opts: DendriteOptions);

  // ── Axon lifecycle ──────────────────────────────────────
  attachAxon(axon: Axon): void;
  addAxon(axon: Axon): Promise<void>;              // attach while running
  detachAxon(neuronId: string, opts?: { reason? }): Promise<void>;
  start(): Promise<void>;
  stop(reason?: string): Promise<void>;

  // ── Dispatch (orchestrator-role only) ────────────────────
  // Addressed (neuron) or capability-routed (capabilities); at least one required.
  dispatchTask(args: DispatchArgs & { finalize?: boolean }): Promise<Signal>;

  // Pathway-based dispatch. scope: "all" | "terminal".
  // finalize defaults true when scope is "terminal", so a stock worker's
  // AGENT_OUTPUT is promoted to FINAL (terminal-handler finalize).
  dispatch(args): Promise<Pathway>;
  dispatchAndWait(args & { timeoutMs?: number }): Promise<Signal>;
  dispatchAndSubscribe(args): Promise<Pathway>;
  dispatchOffer(args: {
    input; capabilities?; deadlineMs?;            // BID-collection window
    select?: "first_bid" | "lowest_cost" | "highest_confidence";
  }): Promise<Pathway>;
  observePathway(traceId: string): Promise<Pathway>;   // watch a peer's trace

  // ── Resilience & cancellation (orchestrator-role only) ──
  runWithRetry(args & { retry: RetryStrategy; timeoutMs? }): Promise<Signal>;
  emitStop(args: { traceId; rollback?; reason? }): Promise<Signal>;
  stopTrace(traceId: string, opts?: {
    rollback?; reason?; collectAcks?; timeoutMs?;
  }): Promise<Signal[]>;

  // Worker side: register your own bidder (suppresses the auto-bidder).
  onTaskOffer(fn: SignalHandler, filter?: HandlerFilter): SignalHandler;
  bid(offer: Signal, args: { neuron; cost; etaMs?; confidence? }): Promise<Signal>;

  // ── Interactive cognition (CLARIFICATION / PERMISSION) ───
  awaitDecision(request: Signal, opts?: { timeoutMs? }): Promise<Signal>;
  answerClarification(request: Signal, opts: { answer; meta? }): Promise<Signal>;

  // ── Engram  -  shared memory (RECALL / IMPRINT) ──────────
  attachEngram(engram: Engram): Promise<void>;
  recall(args: { engramId?; engramKind?; query; ... }): Promise<RecallResult>;
  imprint(args: { engramId?; engramKind?; op; entry; ... }): Promise<ImprintReceipt | null>;

  // ── Inbound handlers  -  method calls, NOT decorators ─────
  // One per SignalType, all accepting an optional filter:
  //   { neuron?, capability?, traceId? }

  // Lifecycle
  onTaskSignal(fn, filter?): SignalHandler;
  onAgentOutput(fn, filter?): SignalHandler;
  onFinal(fn, filter?): SignalHandler;
  onErrorSignal(fn, filter?): SignalHandler;

  // Routing / bidding
  onTaskOffer(fn, filter?): SignalHandler;   // suppresses the auto-bidder
  onBid(fn, filter?): SignalHandler;
  onTaskAwarded(fn, filter?): SignalHandler;
  onTaskDeclined(fn, filter?): SignalHandler;

  // Cognition
  onPlan / onThoughtDelta / onToolCall / onToolResult(fn, filter?);
  onMemoryAppend / onCritique / onEscalation / onConsensus / onContextSync(fn, filter?);

  // Interactive cognition (see awaitDecision)
  onClarification(fn, filter?): SignalHandler;
  onPermission(fn, filter?): SignalHandler;
  onClarificationAnswer(fn, filter?): SignalHandler;
  onPermissionDecision(fn, filter?): SignalHandler;

  // Engram
  onRecallSignal / onImprintSignal(fn, filter?);    // requests on the bus
  onRecalled / onImprinted(fn, filter?);            // responses (observability)

  // Agent management + discovery
  onRegister / onDeregister / onHeartbeat(fn, filter?);
  onDiscover(fn): SignalHandler;

  // Generic escape hatches
  onSignal(type: SignalType, fn, filter?): SignalHandler;
  onTrace(traceId: string, ...types: SignalType[]): (fn) => SignalHandler;
  subscribe(type: SignalType, handler: MessageHandler, opts?): Promise<Subscription>;

  // Lifecycle hooks (inherited)
  onConnect(fn): ConnectHook;    // after this Dendrite registers
  onRefresh(fn): RefreshHook;    // heartbeat / register / deregister
  onSchedule(everyMs, fn): ScheduleHook;

  // ── Cognition emitters ────────────────────────────────────
  emitFinal(args: { traceId; parentId; result; meta? }): Promise<Signal>;
  emitError(args: { traceId; parentId?; code; message; recoverable?; meta? }): Promise<Signal>;
  // + emitPlan / emitThoughtDelta / emitToolCall / emitToolResult /
  //   emitMemoryAppend / emitCritique / emitEscalation / emitConsensus / emitContextSync
  emit(signal: Signal): Promise<void>;   // throws DendriteProtocolError off-list

  // ── Registry reads (require registryStore) ──────────────
  findNeurons(opts?: { capability?: string }): Promise<NeuronRecord[]>;
  registrySnapshot(opts?: ListOptions): Promise<NeuronRecord[]>;
}

Constructor options

OptionTypeDescription
synapseSynapseRequired. An already-connected adapter. The Dendrite never calls connect() on it.
registryStore?RegistryStoreWhen set, the Dendrite mirrors its own Axons and auto-subscribes to REGISTER / DEREGISTER / HEARTBEAT to track the namespace view.
namespace?stringSubject namespace. Default "default".
dendriteId?stringIdentifier embedded as neuron in outbound FINAL / ERROR signals. Default "dendrite".
heartbeatMs?numberHeartbeat interval in milliseconds. Default 30_000. Pass 0 to disable.
reregisterOnHeartbeat?booleanRe-emit REGISTER on each heartbeat tick so late joiners catch up. Default true.

Inbound handlers

Call an on* method with a handler; it returns the same handler. Every registration takes an optional filter { neuron?, capability?, traceId? }. For any other type, use onSignal(type, fn, filter?), or subscribe() for a raw Subscription.

RegistrationFires on
dendrite.onTaskSignal(fn)Every TASK on the namespace.
dendrite.onAgentOutput(fn)Every AGENT_OUTPUT.
dendrite.onFinal(fn)Every FINAL - workflow conclusion.
dendrite.onErrorSignal(fn)Every ERROR.
dendrite.onTaskOffer(fn)Every TASK_OFFER; registering one suppresses the default auto-bidder.
dendrite.onBid(fn) / onTaskAwarded(fn) / onTaskDeclined(fn)The bidding flow - market observability.
dendrite.onPlan(fn) / onThoughtDelta(fn) / onToolCall(fn) / onToolResult(fn)The cognition stream.
dendrite.onMemoryAppend(fn) / onCritique(fn) / onEscalation(fn) / onConsensus(fn) / onContextSync(fn)The remaining cognition / coordination types.
dendrite.onClarification(fn)Every CLARIFICATION. Reply with answerClarification (discrete) or respondToClarification (re-dispatch).
dendrite.onPermission(fn)Every PERMISSION request. Reply with grantPermission / denyPermission / respondToPermission.
dendrite.onClarificationAnswer(fn) / onPermissionDecision(fn)The discrete answers - the handler counterparts of awaitDecision().
dendrite.onRecallSignal(fn) / onImprintSignal(fn)Engram requests crossing the bus.
dendrite.onRecalled(fn) / onImprinted(fn)Engram responses (observability; EngramClient owns correlation).
dendrite.onRegister(fn) / onDeregister(fn) / onHeartbeat(fn)REGISTER (incl. heartbeat re-registers) / DEREGISTER / HEARTBEAT.
dendrite.onDiscover(fn)Every DISCOVER - answer with your hosted Axons (cosmo registry list uses this).
dendrite.onSignal(type, fn, filter?)Any SignalType - the generic form with the same filters.
dendrite.onTrace(traceId, ...types)(fn)Every (or the selected) type on one trace.
dendrite.onConnect(fn) / onRefresh(fn) / onSchedule(everyMs, fn)Lifecycle hooks - not Signals: registration, registry refresh, timer.
await dendrite.subscribe(type, fn)Raw subscription. Returns a Subscription.

Resilience & cancellation

Retry fits the request/reply shape only - the Dendrite owns the full dispatch → wait → close arc and can transparently re-dispatch. The streaming shapes hand the live Pathway to the caller, so retry there would orphan their subscriptions. A new-trace retry broadcasts STOP on the abandoned trace first, so a stalled worker can’t outlive the retry. Full parity with the Python surface.

async methoddendrite.runWithRetry(args & { retry: RetryStrategy, timeoutMs? }): Promise<Signal>

Dispatch and wait, retrying per the RetryStrategy until a non-retryable outcome or attempts are exhausted. Resolves with the terminal Signal (FINAL / AGENT_OUTPUT / CLARIFICATION / PERMISSION, or a final ERROR); rejects with the last error when every attempt timed out.

async methoddendrite.dispatchAndWait(args & { retry?: RetryStrategy }): Promise<Signal>

The request/reply sugar also accepts retry. When supplied it delegates to the same loop as runWithRetry; when omitted it is a single dispatch + wait.

async methoddendrite.emitStop({ traceId, rollback?, reason? }): Promise<Signal>

Broadcast a STOP for a whole trace (orchestrator-gated). Best-effort and idempotent: a peer that never saw it simply isn't stopped. rollback replays each hosted Engram's per-trace saga journal in reverse. Resolves with the emitted STOP Signal.

async methoddendrite.stopTrace(traceId, { rollback?, reason?, collectAcks?, timeoutMs? }): Promise<Signal[]>

Thin wrapper over emitStop. With collectAcks: true it opens a short-lived STOPPED subscription and resolves with the acks seen within timeoutMs (best effort).

retry.ts
import { Dendrite, type RetryStrategy } from "@cosmonapse/sdk";

// Declarative retry policy for the request/reply shape.
interface RetryStrategy {
  maxAttempts?:     number;   // total tries incl. the first (>= 1). Default 3
  timeoutMs?:       number;   // per-attempt terminal timeout. Default 30_000
  backoffMs?:       (attempt: number) => number;  // default 0
  retryOn?:         (outcome: Signal | Error) => boolean;  // default defaultRetryOn
  newTrace?:        boolean;  // fresh trace + STOP the abandoned one. Default true
  rollbackOnRetry?: boolean;  // also roll back its Engram writes. Default false
  onRetry?:         (attempt: number, outcome: Signal | Error) => void;
  reason?:          string;   // carried on the preemptive STOP. Default "retry"
}

// defaultRetryOn retries on a TimeoutError, a Pathway that closed before a
// terminal, or an ERROR flagged recoverable. FINAL / AGENT_OUTPUT /
// CLARIFICATION / PERMISSION are never retried.
const sig = await orch.runWithRetry({
  neuron: "flaky-worker", input: { q: "hi" },
  retry: {
    maxAttempts: 5, timeoutMs: 10_000,
    backoffMs: (a) => 1000 * 2 ** a,  // 1s, 2s, 4s, ...
    rollbackOnRetry: true,            // undo each abandoned attempt's Engram writes
  },
});

// Or cancel a whole workflow yourself (and roll back its Engram writes):
await orch.stopTrace(traceId, { rollback: true, reason: "superseded" });

Example

worker.ts
import { Axon, Dendrite, MemorySynapse } from "@cosmonapse/sdk";

const synapse = new MemorySynapse();
await synapse.connect();

const dendrite = new Dendrite({
  synapse,
  namespace: "demo",
  heartbeatMs: 5_000,
});

dendrite.attachAxon(
  new Axon({ neuronId: "answerer", neuronFn: async (i) => ({ answer: i.q }) }),
);

dendrite.onAgentOutput(async (sig) => {
  await dendrite.emitFinal({
    traceId: sig.trace_id,
    parentId: sig.id,
    result: sig.payload.output,
  });
});

await dendrite.start();
await dendrite.dispatchTask({ neuron: "answerer", input: { q: "hi" } });

// … on shutdown  -  the Dendrite never closes the synapse, you do:
await dendrite.stop();
await synapse.close();

Have a feature in mind?

The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.