TS · 06

Pathway - per-trace event handle

A Pathway is the client-side observation surface for one logical workflow, identified by its trace_id. It supports three consumption shapes on the same primitive - sequential wait(), reactive on() callbacks, and for await iteration. Opened by dispatch() family calls in the originator role, or by observePathway(traceId) in the observer role. With scope: "terminal" only FINAL / ERROR / CLARIFICATION / PERMISSION are delivered, and the dispatch is tagged with finalize so a stock worker’s AGENT_OUTPUT is promoted to FINAL.

classPathway

Auto-closes on FINAL / ERROR. close() is idempotent; a pending wait() rejects with PathwayClosedError when the Pathway closes first.

pathway.ts
// A per-trace event handle. Open via dendrite.dispatch(...) or
// dendrite.observePathway(traceId). Three consumption shapes on the
// same primitive  -  pick whichever fits the workflow.
class Pathway {
  get traceId(): string;
  get parentId(): string | null;
  get role(): "originator" | "observer";
  get scope(): "all" | "terminal";
  get closed(): boolean;

  // Shape 1  -  sequential / request-reply
  wait(timeoutMs?: number): Promise<Signal>;
  waitFor(type: SignalType, timeoutMs?: number): Promise<Signal>;

  // Shape 2  -  reactive callbacks (trace-scoped)
  on(type: SignalType, fn: PathwaySignalHandler): PathwaySignalHandler;

  // Shape 3  -  async iteration
  [Symbol.asyncIterator](): AsyncIterator<Signal>;

  // Lifecycle  -  auto-closes on FINAL / ERROR; close() is idempotent.
  // A wait() pending at close rejects with PathwayClosedError.
  close(): Promise<void>;
}

// Exported sets:
PATHWAY_TYPES;    // every type a Pathway observes
TERMINAL_TYPES;   // FINAL / ERROR  -  auto-close triggers

Example - the three shapes

shapes.ts
import { SignalType, connectSynapse, Dendrite } from "@cosmonapse/sdk";

const synapse = await connectSynapse("cosmo://127.0.0.1:7070");
const orch = new Dendrite({ synapse, namespace: "prod" });
await orch.start();

// Shape 1  -  sequential
const sig = await orch.dispatchAndWait({
  capabilities: ["summarize"], input: { text: "..." }, timeoutMs: 5_000,
});

// Shape 2  -  reactive
const pw = await orch.dispatchAndSubscribe({
  capabilities: ["plan"], input: { goal: "..." },
});
pw.on(SignalType.PLAN, async (s) => console.log(s.payload.steps));

// Shape 3  -  streaming
const stream = await orch.dispatch({ neuron: "agent", input: {} });
for await (const s of stream) {
  if (s.type === SignalType.AGENT_OUTPUT) break;
}
await stream.close();

// Observer role  -  watch a trace another peer started (no TASK emitted)
const watcher = await orch.observePathway("trc_01J...");
console.log(watcher.role);   // "observer"

Have a feature in mind?

The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.