Engram reference.
The cosmonapse.engram subsystem - shared memory for Neurons, serviced over RECALL / IMPRINT signals. Verified against packages/python-sdk/cosmonapse/engram and ENGRAM_DESIGN.md.
Shared memory for Neurons, addressed as a first-class participant.
An Engram is a storage wrapper - the second persistent surface in Cosmonapse after the RegistryStore, and optional in the same way. It is a synapse-side participant with its own envelope category (like a Dendrite), not a Neuron: it never produces AGENT_OUTPUT. Neurons reach it only through Signals - RECALL / IMPRINT - that ride inside the containing TASK trace.
Overview - the mental model
One Engram wraps one backend (sqlite, postgres, a vector store, an object store - anything that holds bytes and answers queries) and owns its own schema. A namespace may run zero, one, or many Engrams, each serving a distinct memory purpose: one for working context, one for vectors, one for blobs, one for relational records.
The intended default is addressed routing. A recall says “I want the vector Engram” by engram_id, or “I want a semantic Engram” by engram_kind, and the matching Engram answers. Fan-out across several Engrams of the same kind is opt-in via recall_mode. This is closer to how TASK routes by neuron than to how TASK_OFFER auctions over bids.
Five invariants
| Invariant | What it means |
|---|---|
| Part of the trace | A RECALL / IMPRINT emitted mid-task inherits the TASK’s trace_id; the parent_id chain proves causation. Doppler, cost rollup, and deadlines apply to Engram I/O too. |
| Storage is plural | No single “the memory.” Multiple Engrams coexist, one per purpose. |
| Engrams are black boxes | The protocol sees opaque keys, queries, and results. The schema is the Engram’s business. |
| Event-driven only | Engrams never expose a direct method to Neurons. Everything is a Signal. |
| Backends are pluggable | SqliteEngram, PostgresEngram, a future PgVectorEngram or S3Engram all conform to the same Engram ABC. |
engram_kind - conventional values
engram_kind is a routing label. Conventional values are relational, semantic, keyvalue, blob, timeseries, and context. Engrams subscribe by kind; the Cortex routes by capability the same way it routes Tasks.
Installation
Engram ships in the base package. InMemoryEngram and SqliteEngram need no extra dependencies; PostgresEngram lazy-imports asyncpg, installed via the postgres extra.
# Engram ships inside the base cosmonapse package - InMemoryEngram, # SqliteEngram and PostgresEngram all included. $ pip install cosmonapse
Top-level imports
The entire public surface is re-exported from cosmonapse.engram. The new_engram_id ULID helper lives on the package root next to the other ID helpers.
# Everything public is re-exported from cosmonapse.engram. from cosmonapse.engram import ( # Core types Engram, # ABC every backend implements EngramBinding, # declarative wiring stored on an Axon EngramClient, # caller-side correlation table (one per Dendrite) Hit, # one search result RecallResult, # what recall() returns ImprintReceipt, # what imprint() returns # Backends InMemoryEngram, # dict-backed; default for dev/tests SqliteEngram, # stdlib sqlite3, single-file DB PostgresEngram, # asyncpg pool; for real deployments # Errors EngramError, EngramTimeout, EngramCancelled, EngramNotBound, EngramOverloaded, ) # eng_… ULID helper lives on the package root from cosmonapse import new_engram_id
EngramBinding - declarative wiring
An EngramBinding is how an Axon declares which Engrams its Neuron may address. The Axon stores a list of them at construction, so the Neuron references memory by a stable local name (e.g. "notes") rather than a deployment-specific engram_id. The Axon enforces this whitelist - a Neuron cannot touch an Engram it was not wired to.
cosmonapse.engram.EngramBindingOne Engram wired into an Axon under a local name. At least one of engram_id or engram_kind must be set.
@dataclass(frozen=True) class EngramBinding: name: str directed_id: str | None = None # the engram_id (directed.id on the wire) directed_type: str | None = None # the engram_kind (directed.type) default_deadline_ms: int | None = None default_recall_mode: str = "first" # "first" | "merge" | "all"
Fields
| Field | Type | Description |
|---|---|---|
| name | str | Local handle the Neuron passes to recall() / imprint(). Unique per Axon. |
| directed_id | str | None | Explicit target (the engram_id; becomes directed.id on the wire). Preferred. One of directed_id / directed_type is required. |
| directed_type | str | None | Slot routing (the engram_kind; becomes directed.type) - deployment owns the concrete implementation behind a kind. |
| default_deadline_ms | int | None | Per-binding default SLA applied when a call omits deadline_ms. |
| default_recall_mode | str | One of "first" · "merge" · "all". Defaults to "first". Validated at construction. |
Example
from cosmonapse import Axon from cosmonapse.engram import EngramBinding axon = Axon( neuron_id="summariser", neuron_fn=summariser, engrams=[ # Addressed routing - Neuron says recall("notes", ...) EngramBinding(name="notes", directed_id="notes-default"), # Slot routing - deployment owns the concrete vector store EngramBinding( name="memory", directed_type="semantic", default_deadline_ms=250, default_recall_mode="merge", ), ], )
recall() & imprint() - the Neuron-side helpers
When a neuron_fn declares recall and/or imprint keyword-only parameters, the Axon injects bound async helpers for the current trace. They resolve the binding name, build the RECALL / IMPRINT envelope, and correlate the response - all scoped to the containing TASK’s trace_id and parent_id. If no Engrams are wired, the helpers raise EngramNotBound.
recall(name, *, query, …) -> RecallResultEmit RECALL to the bound Engram, await the response per recall_mode, and return a RecallResult. Returns an empty (falsy) result on a miss - it does not raise.
async def recall( name: str, # EngramBinding.name on this Axon *, query: dict, # opaque to the protocol; the Engram reads it filters: dict | None = None, context_ref: str | None = None, deadline_ms: int | None = None, recall_mode: str | None = None, # overrides binding default min_confidence: float | None = None, meta: dict | None = None, ) -> RecallResult
| Parameter | Type | Description |
|---|---|---|
| name | str | Binding name wired on the Axon. Raises EngramNotBound if unknown. |
| query | dict | Opaque to the protocol; the Engram interprets it (text, vector, SQL-ish filter, …). |
| filters | dict | None | Structured narrowing applied alongside the query (e.g. tags, namespaces). |
| context_ref | str | None | Opaque pointer threaded to the Engram for context-scoped reads. |
| deadline_ms | int | None | Best-effort SLA. On "first", an elapsed deadline raises EngramTimeout; on "merge"/"all" it resolves with whatever arrived. |
| recall_mode | str | None | Overrides the binding default. first = first responder; merge = accumulate & sort by score; all = stream every responder. |
| min_confidence | float | None | Drop hits below this score (backend-enforced where supported). |
| meta | dict | None | Free-form envelope metadata (e.g. broadcast). |
imprint(name, *, op, entry, …) -> ImprintReceipt | NoneEmit IMPRINT to the bound Engram. Fire-and-forget by default (returns None once on the wire); with await_ack=True it awaits IMPRINTED and returns a receipt.
async def imprint( name: str, *, op: str, # add | append | merge | upsert | delete entry: dict, merge_key: str | None = None, # required for merge / upsert await_ack: bool = False, # True → await IMPRINTED, return receipt deadline_ms: int | None = None, meta: dict | None = None, ) -> ImprintReceipt | None
| Parameter | Type | Description |
|---|---|---|
| op | str | One of add · append · merge · upsert · delete. See semantics below. |
| entry | dict | Opaque body; the Engram validates it against its declared schema. |
| merge_key | str | None | Required for merge and upsert; the key the record is located by. |
| await_ack | bool | False (default) returns once the envelope is published. True awaits IMPRINTED and returns a receipt. |
| deadline_ms | int | None | With await_ack=True, raises EngramTimeout if no ack arrives in time. |
| meta | dict | None | Free-form metadata. meta.broadcast = true opts into multi-receiver writes. |
op semantics
| op | Behaviour |
|---|---|
| add | Insert; fail if the id already exists. |
| append | Append to a sequence/log keyed by merge_key (auto-creates one if absent). |
| merge | Locate by merge_key, deep-merge entry into the existing record. |
| upsert | Replace if merge_key matches, otherwise insert. |
| delete | Remove by id or merge_key. |
End-to-end inside a Neuron
# Injected into the Neuron when neuron_fn declares the kwargs. async def summariser(input, context, *, recall, imprint): # READ - returns a RecallResult (iterable of Hit) prior = await recall( "notes", # binding name (must be wired on the Axon) query={"text": input["topic"]}, filters={"tags": ["kept"]}, deadline_ms=200, recall_mode="first", min_confidence=0.5, ) note = f"summary of {input['topic']} ({len(prior)} priors)" # WRITE - fire-and-forget by default (await_ack=False) await imprint( "notes", op="append", # add | append | merge | upsert | delete entry={"content": note, "tags": ["kept"]}, merge_key=input["topic"], ) # WRITE + receipt - await the IMPRINTED ack receipt = await imprint( "notes", op="upsert", entry={"content": note}, merge_key=input["topic"], await_ack=True, deadline_ms=500, ) assert receipt.ok return {"summary": note, "prior": [h.entry for h in prior]}
Engram - the backend ABC
Every backend implements this exact interface. The conformance suite in tests/test_engram.py runs against any Engram and is the single source of truth for correct behaviour. Subclasses set engram_id, engram_kind, and capabilities on construction. All read/write methods are async; backends wrapping sync libraries (sqlite3) dispatch to a threadpool.
cosmonapse.engram.EngramStorage wrapper - one backend per instance. recall() must return an empty list on a miss rather than raising.
class Engram(ABC): # Set by the backend at construction time. engram_id: str engram_kind: str capabilities: list[str] version: str | None = None # ── Lifecycle ────────────────────────────────────────── async def connect(self) -> None: ... # open pool / file handle async def close(self) -> None: ... # release resources # ── Read / write ─────────────────────────────────────── async def recall( self, query: dict, *, filters: dict | None = None, context_ref: str | None = None, deadline_ms: int | None = None, min_confidence: float | None = None, ) -> list[Hit]: ... # empty list on a miss - never raise async def imprint( self, op: str, entry: dict, *, merge_key: str | None = None, imprint_id: str | None = None, # use for idempotency ) -> ImprintReceipt: ... # ── Optional capability negotiation (default: serve all) ─ async def can_serve(self, query: dict) -> bool: return True
Members
| Member | Kind | Description |
|---|---|---|
| engram_id | attr: str | Stable address other processes route to. |
| engram_kind | attr: str | Routing label (relational, semantic, …). |
| capabilities | attr: list[str] | Query features advertised in REGISTER (e.g. vector_search, tags). |
| version | attr: str | None | Optional backend version surfaced to callers. |
| connect() | async, abstract | Open backend resources (DB pool, file handle). |
| close() | async, abstract | Release backend resources. |
| recall(query, …) | async, abstract | Return matching Hits. Empty list on a miss - never raise. |
| imprint(op, entry, …) | async, abstract | Write. Use imprint_id for idempotency (no-op on re-delivery). |
| can_serve(query) | async, optional | Return False to decline a query (e.g. BM25 engram asked for vectors). Default serves all. |
Implementing a custom backend
from cosmonapse.engram import Engram, Hit, ImprintReceipt class RedisEngram(Engram): def __init__(self, url: str): self.engram_id = "redis-default" self.engram_kind = "keyvalue" self.capabilities = ["substring", "tags"] self._url = url async def connect(self): self._r = await redis.from_url(self._url) async def close(self): await self._r.aclose() async def recall(self, query, **kw) -> list[Hit]: raw = await self._r.get(query["key"]) return [Hit(id=query["key"], entry={"value": raw})] if raw else [] async def imprint(self, op, entry, **kw) -> ImprintReceipt: await self._r.set(entry["key"], entry["value"]) return ImprintReceipt(engram_id=self.engram_id, op=op, id=entry["key"])
Hit · RecallResult · ImprintReceipt
The three caller-facing return types. RecallResult is iterable and truthy, so a Neuron can write for h in result, len(result), or if result: directly. ImprintReceipt.ok is True when error is None.
@dataclass(frozen=True) class Hit: id: str entry: dict score: float = 1.0 # cosine in [0,1] for semantic; 1.0 for relational @dataclass(frozen=True) class RecallResult: hits: list[Hit] = [] engram_ids: tuple[str, ...] = () truncated: bool = False took_ms: int | None = None # Iterable + truthy: 'for h in result', 'len(result)', 'if result:' @dataclass(frozen=True) class ImprintReceipt: engram_id: str op: str id: str | None = None version: int | None = None took_ms: int | None = None error: str | None = None @property def ok(self) -> bool: return self.error is None
recall_mode → what RecallResult contains
| mode | Resolution | hits |
|---|---|---|
| first | Resolves on the first RECALLED. Timeout → EngramTimeout. | That responder’s hits. |
| merge | Accumulates until the deadline, then resolves. | All responders, sorted by score desc. |
| all | Accumulates until the deadline; caller iterates the stream. | Every responder’s hits, with engram_ids populated. |
Bundled backends
Three backends ship in the box. All take keyword-only arguments and default version to "0.0.1". Call await connect() before attach_engram.
InMemoryEngram(*, engram_id='engram-memory', engram_kind='keyvalue', capabilities=None, version='0.0.1')Dict-backed. No dependencies. Resets on process exit. Default capabilities: ['substring', 'tags', 'merge_key']. Ideal for tests and the dev synapse.
SqliteEngram(*, path=':memory:', engram_id='engram-sqlite', engram_kind='relational', capabilities=None, version='0.0.1')Single-file sqlite3 via a threadpool. Default capabilities: ['substring', 'tags', 'merge_key', 'time_range']. Pass a path to persist across restarts.
PostgresEngram(*, dsn, engram_id='engram-postgres', engram_kind='relational', capabilities=None, version='0.0.1', min_size=1, max_size=5, pool_kwargs=None)asyncpg connection pool; the driver is lazy-imported. Default capabilities add 'jsonb'. dsn is required; min_size / max_size / pool_kwargs tune the pool.
Constructing them
from cosmonapse.engram import InMemoryEngram, SqliteEngram, PostgresEngram # Dict-backed. Zero deps. Resets on process exit. Use in tests. mem = InMemoryEngram( engram_id="engram-memory", engram_kind="keyvalue", capabilities=["substring", "tags", "merge_key"], ) # Single-file sqlite3 via threadpool. Survives restarts. sql = SqliteEngram( path="./memory.db", # ":memory:" by default engram_id="engram-sqlite", engram_kind="relational", ) # asyncpg pool. Production. Driver is lazy-imported. pg = PostgresEngram( dsn="postgresql://user:pw@localhost/cosmo", engram_id="engram-postgres", engram_kind="relational", min_size=1, max_size=5, ) await sql.connect() # open resources before attaching
Mounting on a Dendrite
An Engram is mounted on a hosting Dendrite with attach_engram(engram). From then on, that Dendrite subscribes to RECALL / IMPRINT addressed to the Engram’s engram_id or matching its engram_kind, and dispatches them to the instance. The Engram still owns its backend lifecycle.
Dendrite.attach_engram(engram: Engram) -> NoneMount an Engram. Indexes it by engram_id and engram_kind. Raises if an Engram with the same engram_id is already hosted.
Dendrite.detach_engram(engram_id: str) -> NoneRemove a hosted Engram, closing its backend and unsubscribing routing.
Dendrite.engrams -> dict[str, Engram]A copy of the engram_id → Engram map currently hosted on this Dendrite.
from cosmonapse import Dendrite from cosmonapse.engram import SqliteEngram from cosmonapse.synapse.memory import InMemorySynapse synapse = InMemorySynapse() # A Dendrite that *hosts* the Engram and services RECALL / IMPRINT. host = Dendrite(synapse=synapse) notes = SqliteEngram(path="notes.db", engram_id="notes-default") await notes.connect() host.attach_engram(notes) # index by engram_id + engram_kind await host.start() # Inspect / tear down host.engrams # {"notes-default": notes} await host.detach_engram("notes-default") # closes backend, unsubscribes
EngramClient - caller-side correlation
EngramClient is the caller-side bridge - one instance per Dendrite. The Axon’s helpers and the Dendrite both call into it; only the Dendrite touches the Synapse. It builds envelopes, registers pending futures keyed by envelope id, resolves them when a matching RECALLED / IMPRINTED arrives (matched by parent_id), enforces deadlines, and cancels in-flight calls with EngramCancelled when the trace terminates. You rarely construct it yourself.
cosmonapse.engram.EngramClientOne per Dendrite. Owns the pending-future table for RECALL/IMPRINT and resolves responses by parent_id.
class EngramClient: # One per Dendrite. The Dendrite constructs it and drives delivery; # the Axon's recall/imprint helpers call into it. You rarely touch # it directly - it is the caller-side correlation table. def __init__(self, dendrite: Dendrite): ... async def recall(self, *, query, trace_id, parent_id, ...) -> RecallResult async def imprint(self, *, op, entry, trace_id, parent_id, ...) -> ImprintReceipt | None async def _deliver(self, sig: Signal) -> None # match RECALLED/IMPRINTED by parent_id def cancel_trace(self, trace_id: str) -> None # on FINAL/ERROR for the trace def cancel_all(self) -> None # on Dendrite shutdown
Wire signals - RECALL / RECALLED / IMPRINT / IMPRINTED
Engrams add four signal types to SYNAPSE_TYPES. Axons cannot produce them directly - they go through the hosting Dendrite, the same as MEMORY_APPEND. Routing precedence: engram_id beats engram_kind. Entry ids use the eng_ ULID prefix. Engrams piggyback on REGISTER (role: "engram"), HEARTBEAT, and DISCOVER - so the RegistryStore already tracks them; no second registry.
RECALL · request
// RECALL - emitted by a hosting Dendrite on the Neuron's behalf { "type": "RECALL", "trace_id": "trc_01JV…", // inherited from the containing TASK "parent_id": "evt_01JV…", "payload": { "engram_id": "pgvector-default", // OR engram_kind - id wins "engram_kind": "semantic", "query": { "text": "eviction cause" }, "filters": { "tags": ["k8s"] }, "deadline_ms": 250, "recall_mode": "first" // "first" | "merge" | "all" } }
RECALLED · response
parent_id MUST point at the RECALL. Multiple Engrams may respond; the Cortex merges or picks per recall_mode.
// RECALLED - one per responding Engram; parent_id → the RECALL { "type": "RECALLED", "payload": { "engram_id": "pgvector-default", "hits": [ { "id": "eng_01JV…", "score": 0.91, "entry": { } }, { "id": "eng_01JV…", "score": 0.74, "entry": { } } ], "truncated": false, "took_ms": 38 } }
IMPRINT · write & IMPRINTED · ack
// IMPRINT - addressed write (broadcast is opt-in via meta.broadcast) { "type": "IMPRINT", "payload": { "engram_id": "ctx-default", "op": "append", // add|append|merge|upsert|delete "entry": { "id": "eng_01JV…", "content": "Eviction triggered by memory pressure.", "tags": ["k8s", "eviction"] }, "merge_key": "incident:42" // required for merge / upsert } } // IMPRINTED - ack; parent_id → the IMPRINT { "type": "IMPRINTED", "payload": { "engram_id": "ctx-default", "op": "append", "id": "eng_01JV…", "version": 3, "took_ms": 12 } }
MEMORY_APPEND is now a convenience macro that compiles to IMPRINT { op: "append" } - kept for back-compat, but prefer IMPRINT. CONTEXT_SYNC is unchanged: a transient broadcast, not a storage op.
Errors
All Engram exceptions subclass EngramError. Backpressure (EngramOverloaded) surfaces as an error on the IMPRINTED receipt rather than a separate ERROR signal - so a shed write does not terminate the parent TASK.
| Exception | Raised when |
|---|---|
| EngramError | Base class for everything below. |
| EngramTimeout | A RECALL / IMPRINT deadline elapses with no response. |
| EngramCancelled | The containing TASK terminates mid-call (FINAL/ERROR on the trace, or Dendrite shutdown). |
| EngramNotBound | A Neuron asks for a binding name the Axon was not constructed with. |
| EngramOverloaded | A backend sheds load. Reported on the IMPRINTED receipt’s error field. |
ID helpers
Engram entry ids are sortable ULIDs with an eng_ prefix. Generate them with new_engram_id() from the package root.
from cosmonapse import new_engram_id new_engram_id() # 'eng_01JVZ8K3M2…' - sortable ULID, eng_ prefix
For the protocol-level design rationale - routing precedence, broadcast semantics, and the full envelope grammar - see the envelope spec and the Python SDK’s Dendrite reference.
Have a feature in mind?
The protocol, SDKs, and CLI are still pre-1.0. If something here is missing, ambiguous, or wrong - open an issue and propose a change. Every breaking change is debated in DECISIONS.md first.