Your Framework at the Edge.
Dendrite in the Middle.
Flask, FastAPI, Express, or raw WSGI - whichever HTTP layer you already use. A route handler receives the request, calls dispatch_and_wait on a shared Dendrite, and returns the Neuron's reply. The framework never touches the Synapse; the Dendrite never touches HTTP.
The Dendrite is the only Cosmonapse object your route handlers need. Create one per process at startup; reuse it for every request. The worker code is identical across all framework tabs - only the HTTP layer changes.
Start this once before any framework server. It connects to the Synapse, registers under the id "worker", and processes every TASK dispatched to it. The framework examples below all dispatch to this same worker.
# worker.py - start this before any framework server import asyncio, os from cosmonapse import Axon, Dendrite, Neuron, connect_synapse SYNAPSE_URL = os.environ.get("SYNAPSE_URL", "cosmo://127.0.0.1:7070") async def main(): neuron = Neuron( source="huggingface", endpoint="https://router.huggingface.co", model="meta-llama/Llama-3.1-8B-Instruct", api_key=os.environ["HF_TOKEN"], use_chat_api=True, max_new_tokens=256, ) axon = Axon(neuron_id="worker", neuron_fn=neuron, capabilities=["text-generation", "chat"]) synapse = await connect_synapse(SYNAPSE_URL) dendrite = Dendrite(synapse=synapse, namespace="api-demo", dendrite_id="worker") dendrite.attach_axon(axon) async with dendrite: print("worker ready") await asyncio.Event().wait() # run until Ctrl-C asyncio.run(main())
The Dendrite API is identical in every tab. The only differences are the install command, how you manage the async lifecycle, and the framework-specific request/response wiring.
Flask is synchronous. We bridge to the async Cosmonapse SDK with a dedicated event loop running in a background thread - the same thread hosts the Dendrite for the whole process lifetime.
# Install SDK + Flask pip install cosmonapse httpx flask $ export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxx
The dev Synapse and the Neuron worker start first. The Flask app connects to the already-running bus on startup.
# terminal 1 - the bus $ cosmo synapse start memory --namespace=api-demo # terminal 2 - the Neuron worker $ python worker.py # optional - Prism, the live browser view (http://127.0.0.1:7071) $ cosmo doppler --prism --url=cosmo://127.0.0.1:7070 -n api-demo # terminal 3 - Flask $ python flask_app.py
One Dendrite per process - not per request. The background loop keeps it alive. Route handlers call run_coroutine_threadsafe to dispatch from sync Flask code into the async loop.
# flask_app.py import asyncio, threading, time, os from flask import Flask, jsonify, request from cosmonapse import Dendrite, connect_synapse SYNAPSE_URL = os.environ.get("SYNAPSE_URL", "cosmo://127.0.0.1:7070") app = Flask(__name__) # Flask is sync - run a dedicated asyncio loop in a background thread # and share one Dendrite for the process lifetime. _loop: asyncio.AbstractEventLoop _dendrite: Dendrite async def _connect(): global _dendrite synapse = await connect_synapse(SYNAPSE_URL) _dendrite = Dendrite(synapse=synapse, namespace="api-demo", dendrite_id="flask-orchestrator") await _dendrite.__aenter__() def _start_loop(): global _loop _loop = asyncio.new_event_loop() asyncio.set_event_loop(_loop) _loop.run_until_complete(_connect()) _loop.run_forever() threading.Thread(target=_start_loop, daemon=True).start() time.sleep(0.5) # give the loop time to connect @app.post("/ask") def ask(): prompt = request.get_json().get("prompt", "") if not prompt: return jsonify({"error": "prompt required"}), 400 future = asyncio.run_coroutine_threadsafe( _dendrite.dispatch_and_wait( neuron="worker", input={"prompt": prompt}, timeout_s=30.0, ), _loop, ) reply = future.result(timeout=32) return jsonify({"response": reply.payload["output"]["response"]}) if __name__ == "__main__": app.run(port=5000)
Decorators register async callbacks on the Dendrite for specific Signal types. They fire on every matching Signal in the namespace - independently of dispatch_and_wait, which resolves its own future on the same Signal. Use decorators for side-effects: logging, metrics, clarification handling, and live progress feeds.
# Attach these to your orchestrator Dendrite after connecting. # They fire for every matching Signal in the namespace - perfect for # logging, metrics, clarification handling, and live progress feeds. # ── AGENT_OUTPUT ────────────────────────────────────────────────────────── # Fires when any worker finishes a TASK. Filter by neuron= or capability=. # dispatch_and_wait already resolves the caller's future on the same Signal; # use the decorator for side-effects (logging, metrics, webhooks). @orchestrator.on_agent_output async def _log_output(sig): print(f"[{sig.trace_id[:8]}] output from {sig.directed.id if sig.directed else '?'}") @orchestrator.on_agent_output(neuron="worker") # narrow by id async def _worker_done(sig): ... @orchestrator.on_agent_output(capability="text-generation") # narrow by capability async def _gen_done(sig): ... # ── CLARIFICATION ───────────────────────────────────────────────────────── # The Neuron doesn't have enough context and asks a follow-up question. # respond_to_clarification re-dispatches a TASK with the answer attached, # keeping the same trace_id so the workflow stays coherent. @orchestrator.on_clarification async def _handle_clarification(sig): question = sig.payload.get("question", "") print(f"[{sig.trace_id[:8]}] clarification: {question}") await orchestrator.respond_to_clarification( sig, answer="Please use plain English, no technical jargon." ) # ── ERROR ────────────────────────────────────────────────────────────────── # A worker raised an exception. Log it or forward to your error tracker. @orchestrator.on_error_signal async def _handle_error(sig): print(f"[{sig.trace_id[:8]}] error: {sig.payload.get('message')}") # ── ESCALATION ──────────────────────────────────────────────────────────── # The Neuron hit a problem it can't resolve alone and surfaces it to the # orchestrator (or a human). respond_to_escalation sends a follow-up TASK. @orchestrator.on_escalation async def _handle_escalation(sig): print(f"[{sig.trace_id[:8]}] escalated: {sig.payload.get('reason')}") await orchestrator.respond_to_escalation( sig, input={"override": "proceed with best effort"} ) # ── THOUGHT_DELTA ───────────────────────────────────────────────────────── # Streamed reasoning tokens emitted before AGENT_OUTPUT. Pipe these to # an SSE endpoint or WebSocket to show live progress to the user. @orchestrator.on_thought_delta async def _stream_thought(sig): token = sig.payload.get("delta", "") print(token, end="", flush=True) # ── TOOL_CALL / TOOL_RESULT ─────────────────────────────────────────────── # Emitted when the Neuron calls an external tool. Use for audit logs. @orchestrator.on_tool_call async def _on_tool_call(sig): print(f" tool → {sig.payload.get('tool_name')}") @orchestrator.on_tool_result async def _on_tool_result(sig): print(f" tool ← {sig.payload.get('tool_name')}") # ── REGISTER / HEARTBEAT ────────────────────────────────────────────────── # Workers announce themselves on startup and each heartbeat interval. # Use to maintain a live worker roster or drive health dashboards. @orchestrator.on_register_signal async def _on_register(sig): print(f"worker joined: {sig.directed.id if sig.directed else '?'} caps={sig.payload.get('capabilities')}") @orchestrator.on_heartbeat_signal async def _on_heartbeat(sig): print(f"heartbeat: {sig.directed.id if sig.directed else '?'}") # ── on_trace - every Signal for one workflow ────────────────────────────── # Subscribe to all Signal types for a specific trace_id in one call. # Useful for per-request audit logs or live progress feeds. trace_id = new_trace_id() @orchestrator.on_trace(trace_id) async def _trace_all(sig): print(f" [{sig.type.value:20}] {sig.directed.id if sig.directed else '?'}") reply = await orchestrator.dispatch_and_wait( neuron="worker", input={"prompt": prompt}, trace_id=trace_id, )
Send a prompt and get a response from the Neuron:
$ curl -X POST http://127.0.0.1:5000/ask \ -H "Content-Type: application/json" \ -d '{"prompt": "What is a Dendrite?"}' {"response": "A Dendrite is the component that connects your application ..."}
Multiple workers. Replace neuron="worker" with a round-robin pick from a list and add more worker.py processes - the Dendrite routes to whichever id you name.
Streaming. Use a Pathway directly instead of dispatch_and_wait and stream tokens back to the client via a Flask Response(stream_with_context(…)).
Auth. The Dendrite is just an object - wrap it in a class, inject it via Flask's app context (g), or keep it module-level. It's unaware of HTTP; add auth at the Flask layer as usual.
See it live in Prism.
cosmo doppler --prism opens a live, read-only view of every Signal on the bus as it fires. Run it against the dev synapse above to watch this workflow animate.
