Stuck Stages, STOPs, and Saga Rollback.
Resilience layered on the RAG primitives from Example 10, running fully offline so the focus is control flow, not retrieval quality. Built on the same dispatch_and_wait you already know, plus three additions: retry on a fresh trace, cooperative STOP, and Engram saga rollback. We assemble the offline topology first, then exercise each pattern; every snippet is the real code from cosmonapse-examples/13-retry.
One worker, two failure modes.
A single worker hosts a crashable ingester and a flaky generator; the orchestrator drives them and owns the retry / STOP control. The VectorEngram keeps a per-trace saga journal that makes rollback possible.
Three scenarios, fully in-process on a MemorySynapse orchestrator "api" ---dispatch---> worker | | ingester (writes chunks, can crash) | run_with_retry / stop_trace | generator (flaky: stalls, then answers) v v RetryStrategy engram-host (STOP + re-dispatch) VectorEngram "retry-vectors" + saga journal
No token, no broker, no network. Everything runs in-process on a MemorySynapse, so the only dependency is Cosmonapse itself.
# Fully offline - no HF_TOKEN, no network, no broker. Deterministic # local embeddings and a fake LLM, so the focus is control flow. $ pip install cosmonapse $ python demo.py
Real Engram, fake embeddings.
We reuse the actual VectorEngram from Example 10 - including its saga journal - but feed it a deterministic 16-dim embedding so there is no API to call. The resilience behavior is identical; only the embedding quality is throwaway.
# Reuse the VectorEngram from Example 10, but run OFFLINE: a deterministic # 16-dim embedding and a local chunker, so the demo needs no token/network. import sys from pathlib import Path _HERE = Path(__file__).resolve().parent sys.path.insert(0, str(_HERE.parent / "11-rag")) # the VectorEngram backend from cosmonapse import Axon, Dendrite, EngramBinding from vector_engram import VectorEngram # cosine + per-trace saga journal def fake_embed(text) -> list[float]: vec = [0.0] * 16 for ch in text.lower(): vec[ord(ch) % 16] += 1.0 norm = sum(v * v for v in vec) ** 0.5 or 1.0 return [v / norm for v in vec]
Partial writes on purpose.
The ingester writes chunks one at a time under the TASK's trace. The fail_after knob makes it raise mid-stream, leaving the index in a half-written state - exactly the situation rollback exists to clean up.
# ingester: imprints each chunk under the TASK's trace. With fail_after # set it raises partway through, leaving a PARTIAL write the orchestrator # can roll back. await_ack=True makes each write durable before the next. async def ingest_neuron(input, context, *, imprint): doc_id = input["doc_id"] chunks = chunk_text(input["text"]) fail_after = input.get("fail_after") written = 0 for i, chunk in enumerate(chunks): if fail_after is not None and i >= fail_after: raise RuntimeError(f"ingest of {doc_id!r} crashed after {written} chunks") await imprint("vectors", op="upsert", entry={"doc_id": doc_id, "chunk_index": i, "text": chunk, "embedding": fake_embed(chunk)}, merge_key=f"{doc_id}:{i}", await_ack=True, deadline_ms=2000) written += 1 return {"doc_id": doc_id, "chunks": written}
Stalls first, answers later.
This fake LLM hangs on its first stall_first calls, then succeeds. Because every retry is a fresh dispatch, a process-wide call counter models a flaky upstream that recovers after a couple of tries.
# A fake LLM that stalls on its first stall_first calls, then answers. # Each retry is a FRESH dispatch -> a fresh call here, so a stuck attempt # times out, gets STOPped, and the next attempt runs anew. def make_flaky_generator(stall_first, *, stall_s=5.0): state = {"calls": 0} async def generator(input, context): state["calls"] += 1 n = state["calls"] if n <= stall_first: await asyncio.sleep(stall_s) # stalls past the retry timeout return {"answer": f"answer to {input['question']!r}", "attempt": n} generator.state = state # exposed for the demo printout return generator
Engram host, worker, orchestrator.
Identical wiring to the bigger examples, just smaller: one engram host, one worker with both Axons, one orchestrator. The generator is passed in so each scenario can supply a differently-flaky one.
# build: engram-host + worker (2 Axons) + orchestrator. def build(synapse, *, generator): vectors = VectorEngram(engram_id="retry-vectors", engram_kind="semantic") host = Dendrite(synapse=synapse, namespace=NAMESPACE, dendrite_id="engram-host", role="worker") host.attach_engram(vectors) bind = EngramBinding(name="vectors", directed_id="retry-vectors") worker = Dendrite(synapse=synapse, namespace=NAMESPACE, dendrite_id="worker", role="worker") worker.attach_axon(Axon(neuron_id="ingester", neuron_fn=ingest_neuron, capabilities=["ingest"], engrams=[bind])) worker.attach_axon(Axon(neuron_id="generator", neuron_fn=generator, capabilities=["generate"])) orchestrator = Dendrite(synapse=synapse, namespace=NAMESPACE, dendrite_id="api", role="orchestrator") return [host, worker, orchestrator], orchestrator, vectors
Retry a stuck stage on a fresh trace.
Each retry first STOPs the abandoned attempt, so a stalled worker can't keep running - or keep writing to an Engram - behind the retry. dispatch_and_wait(retry=...) takes the same strategy.
# A stage is "stuck" when it produces no terminal Signal within timeout_s # (or returns a recoverable ERROR). run_with_retry STOPs the abandoned # attempt, then re-dispatches on a fresh trace - so a stalled worker can't # keep running, or keep writing to an Engram, behind the retry. from cosmonapse import RetryStrategy def on_retry(attempt, outcome): print(f" attempt {attempt} stuck -> STOP + re-dispatch on a fresh trace") sig = await orch.run_with_retry( neuron="generator", input={"question": "what is a Dendrite?"}, retry=RetryStrategy(max_attempts=3, timeout_s=0.5, on_retry=on_retry), ) print(sig.payload["output"]["answer"])
Cancel a workflow, and undo its writes.
stop_trace cancels every Dendrite working on a trace; rollback=True additionally replays each Engram's inverse-op journal. The commit point is success only, so a failed workflow stays rollback-able while a clean finish discards the journal.
# stop_trace: cooperative cancellation of a whole workflow by trace_id. # Every Dendrite cancels its in-flight neuron work and engram I/O, acks STOPPED. await orch.stop_trace(tid) # rollback=True additionally replays each hosted Engram's per-trace # inverse-op journal, undoing a half-finished write (saga rollback). acks = await orch.stop_trace(tid, rollback=True, collect_acks=True, timeout_s=0.5) compensated = sum(a.payload.get("compensated", 0) for a in acks) # Saga commit point is SUCCESS (FINAL) only: an ERROR leaves the journal in # place so a failed workflow can still be rolled back; a successful FINAL, a # plain stop_trace, or a retry's preemptive STOP all discard the journal.
Three scenarios.
# Scenario Shows 1 Generator hangs on its first attempt retry times out, STOPs the stuck attempt, succeeds on re-dispatch 2 Generator always hangs retry exhausts max_attempts and raises TimeoutError (each STOPped) 3 Ingester crashes after 3 chunks stop_trace(rollback=True) replays the journal; the index goes to 0
No token, no broker.
$ python demo.py
================================================================
1. retry survives a stalled stage
================================================================
attempt 1 stuck (TimeoutError) -> STOP + re-dispatch on a fresh trace
-> answer on attempt 2: answer to 'what is a Dendrite?'
generator was invoked 2 times total
================================================================
3. roll back a half-finished ingest
================================================================
index size before ingest: 0
index size after crash: 3 (partial write)
stop_trace(rollback=True): 3 inverse ops replayed
index size after rollback: 0 (clean)See the Signals fire in the browser.
cosmo doppler --prism opens a live, read-only view of every Signal on the bus - REGISTER, TASK, AGENT_OUTPUT, FINAL - as the workflow runs. The demo runs in-process on a MemorySynapse, which Prism can't attach to, so start a dev synapse and point the code at it.
# This demo runs in-process on a MemorySynapse, which Prism can't attach to. # To watch it live, start a dev synapse and point the code at it: # terminal 1 - the bus $ cosmo synapse start memory --namespace=retry-demo # terminal 2 - Prism, the live browser view (http://127.0.0.1:7071) $ cosmo doppler --prism --url=cosmo://127.0.0.1:7070 -n retry-demo # in the code - swap one line: # synapse = MemorySynapse() synapse = await connect_synapse("cosmo://127.0.0.1:7070")
