Topology

Retrieve, write, run.

Three Neurons across two workers, one VectorEngram for the style docs, all on a single trace_id. The steps below build each box, then the pipeline that connects them.

topology
code_pipeline(request)   -   one trace_id

  orchestrator
       |
       v
   coder  ------->  files  ------->  runner
 (worker-rag)    (worker-tools)   (worker-tools)
    |  RECALL     MCP filesystem    subprocess
    v             write_file        python fib.py 10
 VectorEngram "code-docs"
 house-style.md + review-checklist.md
00 · Install

Python 3.11+ and Node 18+ - the filesystem MCP server runs via npx, and mcp is the client library Neuron(source="mcp") drives.

# Python 3.11+. The filesystem MCP server runs via npx, so Node 18+
# is required. mcp is the client library Neuron(source="mcp") uses.
$ pip install cosmonapse httpx python-dotenv mcp

# A real token in cosmonapse-examples/.env  -  the coder calls Llama-3.1.
$ export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxx
01 · Reuse the retrieval stack

Import Example 10, don't rebuild it.

The VectorEngram and the embed / chunk helpers come straight from 11-rag on the path. The coding agent adds new Neurons on top of an unchanged retrieval backend - that reuse is the whole point.

rag_codegen.py
# This example reuses the VectorEngram + embeddings from Example 10
# verbatim - one import path away. Nothing about retrieval is re-built.
import sys
from pathlib import Path

_HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(_HERE.parent / "11-rag"))   # VectorEngram, embeddings

from cosmonapse import Axon, Dendrite, EngramBinding, Neuron, new_trace_id
from embeddings import chunk_text, embed             # from 11-rag
from vector_engram import VectorEngram               # from 11-rag

NAMESPACE = "rag-mcp"
GEN_MODEL = "meta-llama/Llama-3.1-8B-Instruct"
OUT_DIR = "generated"
02 · The librarian

Index the house style.

Same shape as the RAG ingester, single index: chunk, embed, and imprint the team's style guide and review checklist into code-docs. These are the rules the model will follow at generation time without ever being trained on them.

rag_codegen.py
# librarian: index the team's reference docs into the code-docs engram,
# so the coder can recall house rules the model was never trained on.
async def librarian_neuron(input, context, *, imprint):
    doc_id = input["doc_id"]
    chunks = chunk_text(input["text"])
    vectors = await embed(chunks, api_key=_hf_token())
    for i, (chunk, vec) in enumerate(zip(chunks, vectors)):
        await imprint("docs", op="upsert",
                      entry={"doc_id": doc_id, "chunk_index": i,
                             "text": chunk, "embedding": vec},
                      merge_key=f"{doc_id}:{i}",
                      await_ack=(i == len(chunks) - 1), deadline_ms=2000)
    return {"doc_id": doc_id, "chunks": len(chunks)}
03 · The coder

Rules from retrieval, not training.

The coder recalls the most relevant style chunks for each request, drops them into the system context, and asks Llama-3.1 for exactly one fenced script. extract_code() pulls the block out of the reply. Swap the indexed docs and the generated code changes - no prompt edits.

rag_codegen.py
# coder: recall the style docs for THIS request, put them in-context,
# and have Llama write ONE small script that follows rules from retrieval.
def make_coder_neuron():
    llm = Neuron(source="huggingface", endpoint="https://router.huggingface.co",
                 model=GEN_MODEL, api_key=_hf_token(), use_chat_api=True,
                 max_new_tokens=1024, temperature=0.1)

    async def coder_neuron(input, context, *, recall):
        request = input["request"]
        qvec = (await embed([request], api_key=_hf_token()))[0]
        result = await recall("docs", query={"embedding": qvec, "top_k": TOP_K},
                              deadline_ms=2000)
        guide = "

---

".join(h.entry["text"] for h in result.hits)
        sources = [f"{h.entry['doc_id']}#{h.entry['chunk_index']}" for h in result.hits]

        messages = [
            {"role": "system", "content": CODER_SYSTEM},
            {"role": "user", "content": f"House-style context:

{guide}

Request: {request}"},
        ]
        out = await llm({"messages": messages}, [])
        code = extract_code(out["response"])          # pull the fenced block
        if code is None:
            return {"__error__": True, "message": "coder produced no usable code"}
        return {"filename": input["filename"], "code": code, "sources": sources}

    return coder_neuron
04 · MCP as a Neuron

An MCP server is just another source.

Neuron(source="mcp", ...) wraps a stdio MCP server as a pure Neuron, sandboxed to the project folder. The pipeline writes generated/<name>.py through it - no custom tool-calling code, and any of the dozens of community MCP servers drops in the same way.

rag_codegen.py
# files: the STANDARD MCP filesystem server, wrapped as a Neuron and
# sandboxed to this folder. Neuron(source="mcp", ...) speaks MCP for you -
# no bespoke tool-calling glue. The orchestrator dispatches to it like any
# other Neuron; the input names the MCP tool and its arguments.
worker_b.attach_axon(Axon(
    neuron_id="files",
    neuron_fn=Neuron(source="mcp", server="filesystem", args=[str(_HERE)]),
    capabilities=["mcp", "filesystem"],
))
05 · The runner

Execute, capture, report.

A plain Neuron that shells out to the generated file, captures its output, and returns the exit code. It closes the loop - the agent doesn't just write code, it proves the code runs.

rag_codegen.py
# runner: execute the generated script in a subprocess with a 10s
# timeout, capture stdout/stderr + exit code.
#
# NOTE: this runs LLM-generated code on your machine. Fine for this toy;
# review generated/ before trusting the pattern with anything real.
async def runner_neuron(input, context):
    path = Path(input["path"])
    proc = await asyncio.create_subprocess_exec(
        sys.executable, str(path), *input.get("argv", []),
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(_HERE))
    try:
        stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
    except asyncio.TimeoutError:
        proc.kill()
        return {"exit_code": -1, "stdout": "", "stderr": "timeout after 10s"}
    return {"exit_code": proc.returncode,
            "stdout": stdout.decode(errors="replace"),
            "stderr": stderr.decode(errors="replace")}

Note: the runner executes model-generated code on your machine. It is fine for this toy - review generated/ before reusing the pattern.

06 · Wire the topology

Two workers, one engram host.

worker-rag hosts the librarian and coder (both bound to code-docs); worker-tools hosts the MCP files Neuron and the runner. The orchestrator drives all three pipeline stages.

rag_codegen.py
# build_codegen: 1 engram host + 2 workers + orchestrator.
def build_codegen(synapse):
    docs = VectorEngram(engram_id="code-docs", engram_kind="semantic")

    host = Dendrite(synapse=synapse, namespace=NAMESPACE,
                    dendrite_id="docs-host", role="worker")
    host.attach_engram(docs)
    bind_docs = [EngramBinding(name="docs", directed_id="code-docs")]

    worker_a = Dendrite(synapse=synapse, namespace=NAMESPACE,
                        dendrite_id="worker-rag", role="worker")
    worker_a.attach_axon(Axon(neuron_id="librarian", neuron_fn=librarian_neuron,
        capabilities=["docs-ingest"], engrams=bind_docs))
    worker_a.attach_axon(Axon(neuron_id="coder", neuron_fn=make_coder_neuron(),
        capabilities=["codegen"], engrams=bind_docs))

    worker_b = Dendrite(synapse=synapse, namespace=NAMESPACE,
                        dendrite_id="worker-tools", role="worker")
    worker_b.attach_axon(Axon(neuron_id="files",
        neuron_fn=Neuron(source="mcp", server="filesystem", args=[str(_HERE)]),
        capabilities=["mcp", "filesystem"]))
    worker_b.attach_axon(Axon(neuron_id="runner", neuron_fn=runner_neuron,
        capabilities=["execute"]))

    orchestrator = Dendrite(synapse=synapse, namespace=NAMESPACE,
                            dendrite_id="codegen-api", role="orchestrator")
    return [host, worker_a, worker_b, orchestrator], orchestrator
07 · The pipeline

coder → files → runner, one trace.

Three dispatch_and_wait calls threaded by trace_id / parent_id: generate the code, persist it through the MCP server, then run it. One coherent trace from request to exit code.

rag_codegen.py
# coder -> files(write_file) -> runner, all on one trace_id.
async def code_pipeline(orchestrator, request, filename, argv=None, timeout_s=90.0):
    tid = new_trace_id()

    # 1. RAG-grounded generation.
    r = await orchestrator.dispatch_and_wait(neuron="coder",
        input={"request": request, "filename": filename},
        trace_id=tid, timeout_s=timeout_s)
    if r.type.value == "ERROR":
        raise RuntimeError(f"coder failed: {r.payload.get('message')}")
    gen = r.payload["output"]

    # 2. Persist through the MCP filesystem server.
    rel_path = f"{OUT_DIR}/{gen['filename']}"
    r = await orchestrator.dispatch_and_wait(neuron="files",
        input={"tool": "write_file",
               "arguments": {"path": str(_HERE / rel_path), "content": gen["code"]}},
        trace_id=tid, parent_id=r.id, timeout_s=30.0)

    # 3. Run it.
    r = await orchestrator.dispatch_and_wait(neuron="runner",
        input={"path": rel_path, "argv": argv or []},
        trace_id=tid, parent_id=r.id, timeout_s=30.0)
    return {"filename": rel_path, "code": gen["code"],
            "sources": gen["sources"], "run": r.payload["output"]}
08 · Run it

One command.

terminal
# Indexes the style docs, then asks the pipeline to write, save (via the
# MCP server) and run a tiny Fibonacci CLI.
$ python demo.py
indexed house-style          3 chunks
indexed review-checklist     2 chunks

request: Code a small command-line tool that prints the first N Fibonacci numbers.
wrote generated/fib.py  (grounded on: house-style#0, review-checklist#1)
--- run: python fib.py 10 ---
0 1 1 2 3 5 8 13 21 34
exit code: 0

# Watch the retrieve -> write -> run trace animate in the browser.
$ cosmo doppler -n rag-mcp
Watch it in Prism

See the Signals fire in the browser.

cosmo doppler --prism opens a live, read-only view of every Signal on the bus - REGISTER, TASK, AGENT_OUTPUT, FINAL - as the workflow runs. The demo runs in-process on a MemorySynapse, which Prism can't attach to, so start a dev synapse and point the code at it.

terminal
# This demo runs in-process on a MemorySynapse, which Prism can't attach to.
# To watch it live, start a dev synapse and point the code at it:

# terminal 1  -  the bus
$ cosmo synapse start memory --namespace=rag-mcp

# terminal 2  -  Prism, the live browser view (http://127.0.0.1:7071)
$ cosmo doppler --prism --url=cosmo://127.0.0.1:7070 -n rag-mcp

# in the code  -  swap one line:
# synapse = MemorySynapse()
synapse = await connect_synapse("cosmo://127.0.0.1:7070")
http://127.0.0.1:7071 · -n rag-mcp
Prism showing Signals animating in the rag-mcp namespace
Prism renders every Signal on the bus as it fires — REGISTER, TASK, AGENT_OUTPUT, FINAL.