Topology

One worker, two failure modes.

A single worker hosts a crashable ingester and a flaky generator; the orchestrator drives them and owns the retry / STOP control. The VectorEngram keeps a per-trace saga journal that makes rollback possible.

topology
Three scenarios, fully in-process on a MemorySynapse

  orchestrator "api"  ---dispatch--->  worker
       |                                  |  ingester  (writes chunks, can crash)
       |  run_with_retry / stop_trace     |  generator (flaky: stalls, then answers)
       v                                  v
  RetryStrategy                       engram-host
  (STOP + re-dispatch)                VectorEngram "retry-vectors" + saga journal
00 · Install

No token, no broker, no network. Everything runs in-process on a MemorySynapse, so the only dependency is Cosmonapse itself.

# Fully offline  -  no HF_TOKEN, no network, no broker. Deterministic
# local embeddings and a fake LLM, so the focus is control flow.
$ pip install cosmonapse
$ python demo.py
01 · Offline backend

Real Engram, fake embeddings.

We reuse the actual VectorEngram from Example 10 - including its saga journal - but feed it a deterministic 16-dim embedding so there is no API to call. The resilience behavior is identical; only the embedding quality is throwaway.

resilient.py
# Reuse the VectorEngram from Example 10, but run OFFLINE: a deterministic
# 16-dim embedding and a local chunker, so the demo needs no token/network.
import sys
from pathlib import Path

_HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(_HERE.parent / "11-rag"))   # the VectorEngram backend

from cosmonapse import Axon, Dendrite, EngramBinding
from vector_engram import VectorEngram              # cosine + per-trace saga journal

def fake_embed(text) -> list[float]:
    vec = [0.0] * 16
    for ch in text.lower():
        vec[ord(ch) % 16] += 1.0
    norm = sum(v * v for v in vec) ** 0.5 or 1.0
    return [v / norm for v in vec]
02 · A crashable ingester

Partial writes on purpose.

The ingester writes chunks one at a time under the TASK's trace. The fail_after knob makes it raise mid-stream, leaving the index in a half-written state - exactly the situation rollback exists to clean up.

resilient.py
# ingester: imprints each chunk under the TASK's trace. With fail_after
# set it raises partway through, leaving a PARTIAL write the orchestrator
# can roll back. await_ack=True makes each write durable before the next.
async def ingest_neuron(input, context, *, imprint):
    doc_id = input["doc_id"]
    chunks = chunk_text(input["text"])
    fail_after = input.get("fail_after")
    written = 0
    for i, chunk in enumerate(chunks):
        if fail_after is not None and i >= fail_after:
            raise RuntimeError(f"ingest of {doc_id!r} crashed after {written} chunks")
        await imprint("vectors", op="upsert",
                      entry={"doc_id": doc_id, "chunk_index": i, "text": chunk,
                             "embedding": fake_embed(chunk)},
                      merge_key=f"{doc_id}:{i}", await_ack=True, deadline_ms=2000)
        written += 1
    return {"doc_id": doc_id, "chunks": written}
03 · A flaky generator

Stalls first, answers later.

This fake LLM hangs on its first stall_first calls, then succeeds. Because every retry is a fresh dispatch, a process-wide call counter models a flaky upstream that recovers after a couple of tries.

resilient.py
# A fake LLM that stalls on its first stall_first calls, then answers.
# Each retry is a FRESH dispatch -> a fresh call here, so a stuck attempt
# times out, gets STOPped, and the next attempt runs anew.
def make_flaky_generator(stall_first, *, stall_s=5.0):
    state = {"calls": 0}

    async def generator(input, context):
        state["calls"] += 1
        n = state["calls"]
        if n <= stall_first:
            await asyncio.sleep(stall_s)            # stalls past the retry timeout
        return {"answer": f"answer to {input['question']!r}", "attempt": n}

    generator.state = state                          # exposed for the demo printout
    return generator
04 · Wire the topology

Engram host, worker, orchestrator.

Identical wiring to the bigger examples, just smaller: one engram host, one worker with both Axons, one orchestrator. The generator is passed in so each scenario can supply a differently-flaky one.

resilient.py
# build: engram-host + worker (2 Axons) + orchestrator.
def build(synapse, *, generator):
    vectors = VectorEngram(engram_id="retry-vectors", engram_kind="semantic")

    host = Dendrite(synapse=synapse, namespace=NAMESPACE,
                    dendrite_id="engram-host", role="worker")
    host.attach_engram(vectors)

    bind = EngramBinding(name="vectors", directed_id="retry-vectors")
    worker = Dendrite(synapse=synapse, namespace=NAMESPACE,
                      dendrite_id="worker", role="worker")
    worker.attach_axon(Axon(neuron_id="ingester", neuron_fn=ingest_neuron,
        capabilities=["ingest"], engrams=[bind]))
    worker.attach_axon(Axon(neuron_id="generator", neuron_fn=generator,
        capabilities=["generate"]))

    orchestrator = Dendrite(synapse=synapse, namespace=NAMESPACE,
                            dendrite_id="api", role="orchestrator")
    return [host, worker, orchestrator], orchestrator, vectors
05 · run_with_retry

Retry a stuck stage on a fresh trace.

Each retry first STOPs the abandoned attempt, so a stalled worker can't keep running - or keep writing to an Engram - behind the retry. dispatch_and_wait(retry=...) takes the same strategy.

demo.py
# A stage is "stuck" when it produces no terminal Signal within timeout_s
# (or returns a recoverable ERROR). run_with_retry STOPs the abandoned
# attempt, then re-dispatches on a fresh trace - so a stalled worker can't
# keep running, or keep writing to an Engram, behind the retry.
from cosmonapse import RetryStrategy

def on_retry(attempt, outcome):
    print(f"   attempt {attempt} stuck -> STOP + re-dispatch on a fresh trace")

sig = await orch.run_with_retry(
    neuron="generator",
    input={"question": "what is a Dendrite?"},
    retry=RetryStrategy(max_attempts=3, timeout_s=0.5, on_retry=on_retry),
)
print(sig.payload["output"]["answer"])
06 · stop_trace + rollback

Cancel a workflow, and undo its writes.

stop_trace cancels every Dendrite working on a trace; rollback=True additionally replays each Engram's inverse-op journal. The commit point is success only, so a failed workflow stays rollback-able while a clean finish discards the journal.

demo.py
# stop_trace: cooperative cancellation of a whole workflow by trace_id.
# Every Dendrite cancels its in-flight neuron work and engram I/O, acks STOPPED.
await orch.stop_trace(tid)

# rollback=True additionally replays each hosted Engram's per-trace
# inverse-op journal, undoing a half-finished write (saga rollback).
acks = await orch.stop_trace(tid, rollback=True, collect_acks=True, timeout_s=0.5)
compensated = sum(a.payload.get("compensated", 0) for a in acks)

# Saga commit point is SUCCESS (FINAL) only: an ERROR leaves the journal in
# place so a failed workflow can still be rolled back; a successful FINAL, a
# plain stop_trace, or a retry's preemptive STOP all discard the journal.
07 · What demo.py walks through

Three scenarios.

scenarios
#  Scenario                                Shows
1  Generator hangs on its first attempt     retry times out, STOPs the stuck
                                            attempt, succeeds on re-dispatch
2  Generator always hangs                   retry exhausts max_attempts and
                                            raises TimeoutError (each STOPped)
3  Ingester crashes after 3 chunks          stop_trace(rollback=True) replays
                                            the journal; the index goes to 0
08 · Run it

No token, no broker.

terminal
$ python demo.py
================================================================
1. retry survives a stalled stage
================================================================
   attempt 1 stuck (TimeoutError) -> STOP + re-dispatch on a fresh trace
   -> answer on attempt 2: answer to 'what is a Dendrite?'
   generator was invoked 2 times total

================================================================
3. roll back a half-finished ingest
================================================================
   index size before ingest: 0
   index size after crash: 3  (partial write)
   stop_trace(rollback=True): 3 inverse ops replayed
   index size after rollback: 0  (clean)
Watch it in Prism

See the Signals fire in the browser.

cosmo doppler --prism opens a live, read-only view of every Signal on the bus - REGISTER, TASK, AGENT_OUTPUT, FINAL - as the workflow runs. The demo runs in-process on a MemorySynapse, which Prism can't attach to, so start a dev synapse and point the code at it.

terminal
# This demo runs in-process on a MemorySynapse, which Prism can't attach to.
# To watch it live, start a dev synapse and point the code at it:

# terminal 1  -  the bus
$ cosmo synapse start memory --namespace=retry-demo

# terminal 2  -  Prism, the live browser view (http://127.0.0.1:7071)
$ cosmo doppler --prism --url=cosmo://127.0.0.1:7070 -n retry-demo

# in the code  -  swap one line:
# synapse = MemorySynapse()
synapse = await connect_synapse("cosmo://127.0.0.1:7070")
http://127.0.0.1:7071 · -n retry-demo
Prism showing Signals animating in the retry-demo namespace
Prism renders every Signal on the bus as it fires — REGISTER, TASK, AGENT_OUTPUT, FINAL.