← Examples/Building an Orchestrator API
HTTP clientFlask / FastAPI / Express / WSGIDendriteSynapseNeuron

The Dendrite is the only Cosmonapse object your route handlers need. Create one per process at startup; reuse it for every request. The worker code is identical across all framework tabs - only the HTTP layer changes.

00 · Shared worker

Start this once before any framework server. It connects to the Synapse, registers under the id "worker", and processes every TASK dispatched to it. The framework examples below all dispatch to this same worker.

worker.py
# worker.py  -  start this before any framework server
import asyncio, os
from cosmonapse import Axon, Dendrite, Neuron, connect_synapse

SYNAPSE_URL = os.environ.get("SYNAPSE_URL", "cosmo://127.0.0.1:7070")

async def main():
    neuron = Neuron(
        source="huggingface",
        endpoint="https://router.huggingface.co",
        model="meta-llama/Llama-3.1-8B-Instruct",
        api_key=os.environ["HF_TOKEN"],
        use_chat_api=True, max_new_tokens=256,
    )
    axon     = Axon(neuron_id="worker", neuron_fn=neuron,
                    capabilities=["text-generation", "chat"])
    synapse  = await connect_synapse(SYNAPSE_URL)
    dendrite = Dendrite(synapse=synapse, namespace="api-demo", dendrite_id="worker")
    dendrite.attach_axon(axon)
    async with dendrite:
        print("worker ready")
        await asyncio.Event().wait()   # run until Ctrl-C

asyncio.run(main())
Pick your framework

The Dendrite API is identical in every tab. The only differences are the install command, how you manage the async lifecycle, and the framework-specific request/response wiring.

01 · Install

Flask is synchronous. We bridge to the async Cosmonapse SDK with a dedicated event loop running in a background thread - the same thread hosts the Dendrite for the whole process lifetime.

# Install SDK + Flask
pip install cosmonapse httpx flask

$ export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxx
02 · Start the bus and worker

The dev Synapse and the Neuron worker start first. The Flask app connects to the already-running bus on startup.

# terminal 1  -  the bus
$ cosmo synapse start memory --namespace=api-demo

# terminal 2  -  the Neuron worker
$ python worker.py

# optional  -  Prism, the live browser view (http://127.0.0.1:7071)
$ cosmo doppler --prism --url=cosmo://127.0.0.1:7070 -n api-demo

# terminal 3  -  Flask
$ python flask_app.py
03 · The Flask app

One Dendrite per process - not per request. The background loop keeps it alive. Route handlers call run_coroutine_threadsafe to dispatch from sync Flask code into the async loop.

flask_app.py
# flask_app.py
import asyncio, threading, time, os
from flask import Flask, jsonify, request
from cosmonapse import Dendrite, connect_synapse

SYNAPSE_URL = os.environ.get("SYNAPSE_URL", "cosmo://127.0.0.1:7070")
app = Flask(__name__)

# Flask is sync  -  run a dedicated asyncio loop in a background thread
# and share one Dendrite for the process lifetime.
_loop: asyncio.AbstractEventLoop
_dendrite: Dendrite

async def _connect():
    global _dendrite
    synapse   = await connect_synapse(SYNAPSE_URL)
    _dendrite = Dendrite(synapse=synapse, namespace="api-demo",
                         dendrite_id="flask-orchestrator")
    await _dendrite.__aenter__()

def _start_loop():
    global _loop
    _loop = asyncio.new_event_loop()
    asyncio.set_event_loop(_loop)
    _loop.run_until_complete(_connect())
    _loop.run_forever()

threading.Thread(target=_start_loop, daemon=True).start()
time.sleep(0.5)   # give the loop time to connect

@app.post("/ask")
def ask():
    prompt = request.get_json().get("prompt", "")
    if not prompt:
        return jsonify({"error": "prompt required"}), 400
    future = asyncio.run_coroutine_threadsafe(
        _dendrite.dispatch_and_wait(
            neuron="worker", input={"prompt": prompt}, timeout_s=30.0,
        ), _loop,
    )
    reply = future.result(timeout=32)
    return jsonify({"response": reply.payload["output"]["response"]})

if __name__ == "__main__":
    app.run(port=5000)
04 · Decorators

Decorators register async callbacks on the Dendrite for specific Signal types. They fire on every matching Signal in the namespace - independently of dispatch_and_wait, which resolves its own future on the same Signal. Use decorators for side-effects: logging, metrics, clarification handling, and live progress feeds.

decorators.py
# Attach these to your orchestrator Dendrite after connecting.
# They fire for every matching Signal in the namespace  -  perfect for
# logging, metrics, clarification handling, and live progress feeds.

# ── AGENT_OUTPUT ──────────────────────────────────────────────────────────
# Fires when any worker finishes a TASK. Filter by neuron= or capability=.
# dispatch_and_wait already resolves the caller's future on the same Signal;
# use the decorator for side-effects (logging, metrics, webhooks).
@orchestrator.on_agent_output
async def _log_output(sig):
    print(f"[{sig.trace_id[:8]}] output from {sig.directed.id if sig.directed else '?'}")

@orchestrator.on_agent_output(neuron="worker")            # narrow by id
async def _worker_done(sig): ...

@orchestrator.on_agent_output(capability="text-generation")  # narrow by capability
async def _gen_done(sig): ...


# ── CLARIFICATION ─────────────────────────────────────────────────────────
# The Neuron doesn't have enough context and asks a follow-up question.
# respond_to_clarification re-dispatches a TASK with the answer attached,
# keeping the same trace_id so the workflow stays coherent.
@orchestrator.on_clarification
async def _handle_clarification(sig):
    question = sig.payload.get("question", "")
    print(f"[{sig.trace_id[:8]}] clarification: {question}")
    await orchestrator.respond_to_clarification(
        sig, answer="Please use plain English, no technical jargon."
    )


# ── ERROR ──────────────────────────────────────────────────────────────────
# A worker raised an exception. Log it or forward to your error tracker.
@orchestrator.on_error_signal
async def _handle_error(sig):
    print(f"[{sig.trace_id[:8]}] error: {sig.payload.get('message')}")


# ── ESCALATION ────────────────────────────────────────────────────────────
# The Neuron hit a problem it can't resolve alone and surfaces it to the
# orchestrator (or a human). respond_to_escalation sends a follow-up TASK.
@orchestrator.on_escalation
async def _handle_escalation(sig):
    print(f"[{sig.trace_id[:8]}] escalated: {sig.payload.get('reason')}")
    await orchestrator.respond_to_escalation(
        sig, input={"override": "proceed with best effort"}
    )


# ── THOUGHT_DELTA ─────────────────────────────────────────────────────────
# Streamed reasoning tokens emitted before AGENT_OUTPUT. Pipe these to
# an SSE endpoint or WebSocket to show live progress to the user.
@orchestrator.on_thought_delta
async def _stream_thought(sig):
    token = sig.payload.get("delta", "")
    print(token, end="", flush=True)


# ── TOOL_CALL / TOOL_RESULT ───────────────────────────────────────────────
# Emitted when the Neuron calls an external tool. Use for audit logs.
@orchestrator.on_tool_call
async def _on_tool_call(sig):
    print(f"  tool → {sig.payload.get('tool_name')}")

@orchestrator.on_tool_result
async def _on_tool_result(sig):
    print(f"  tool ← {sig.payload.get('tool_name')}")


# ── REGISTER / HEARTBEAT ──────────────────────────────────────────────────
# Workers announce themselves on startup and each heartbeat interval.
# Use to maintain a live worker roster or drive health dashboards.
@orchestrator.on_register_signal
async def _on_register(sig):
    print(f"worker joined: {sig.directed.id if sig.directed else '?'}  caps={sig.payload.get('capabilities')}")

@orchestrator.on_heartbeat_signal
async def _on_heartbeat(sig):
    print(f"heartbeat: {sig.directed.id if sig.directed else '?'}")


# ── on_trace  -  every Signal for one workflow ──────────────────────────────
# Subscribe to all Signal types for a specific trace_id in one call.
# Useful for per-request audit logs or live progress feeds.
trace_id = new_trace_id()

@orchestrator.on_trace(trace_id)
async def _trace_all(sig):
    print(f"  [{sig.type.value:20}] {sig.directed.id if sig.directed else '?'}")

reply = await orchestrator.dispatch_and_wait(
    neuron="worker", input={"prompt": prompt}, trace_id=trace_id,
)
05 · Try it

Send a prompt and get a response from the Neuron:

$ curl -X POST http://127.0.0.1:5000/ask \
       -H "Content-Type: application/json" \
       -d '{"prompt": "What is a Dendrite?"}'

{"response": "A Dendrite is the component that connects your application ..."}
Extend the pattern

Multiple workers. Replace neuron="worker" with a round-robin pick from a list and add more worker.py processes - the Dendrite routes to whichever id you name.

Streaming. Use a Pathway directly instead of dispatch_and_wait and stream tokens back to the client via a Flask Response(stream_with_context(…)).

Auth. The Dendrite is just an object - wrap it in a class, inject it via Flask's app context (g), or keep it module-level. It's unaware of HTTP; add auth at the Flask layer as usual.

Watch it

See it live in Prism.

cosmo doppler --prism opens a live, read-only view of every Signal on the bus as it fires. Run it against the dev synapse above to watch this workflow animate.

http://127.0.0.1:7071 · -n api-demo
Prism showing Signals animating in the api-demo namespace
Prism renders every Signal on the bus as it fires — REGISTER, TASK, AGENT_OUTPUT, FINAL.