Five Production Patterns for Context Engines in AI Applications
Moving from prototype to production means solving session isolation, multi-tenancy, contradiction handling, importance signals, and startup latency. Here are the five patterns that handle all of them.
From prototype to production
Getting a context engine working in a demo takes 20 lines. Getting it right in production is a different problem. You need session isolation so Agent A's memories don't contaminate Agent B's context. You need importance signals derived from real engagement data, not guesses. You need contradiction handling for when new facts supersede old ones. You need multi-tenant isolation for SaaS products. And you need startup performance that doesn't add 5 seconds of latency to every cold start.
These five patterns cover the production concerns that come up in every serious Feather DB deployment.
Pattern 1: Session isolation via namespaces
The simplest multi-tenant mistake is sharing a single DB instance across all users or sessions. Memories bleed across boundaries and retrieval becomes noisy. Feather DB supports namespaced partitioning through separate DB files or by encoding the namespace in metadata and filtering at query time.
The cleanest approach: one .feather file per tenant, opened on demand and closed after inactivity.
import feather_db as fdb
from pathlib import Path
from functools import lru_cache
import threading
MEMORY_DIR = Path("/var/data/agent-memories")
_lock = threading.Lock()
_open_dbs: dict[str, fdb.DB] = {}
def get_db(tenant_id: str, dim: int = 768) -> fdb.DB:
"""Get or open a per-tenant DB instance."""
if tenant_id not in _open_dbs:
with _lock:
if tenant_id not in _open_dbs: # double-checked locking
path = MEMORY_DIR / f"{tenant_id}.feather"
_open_dbs[tenant_id] = fdb.DB.open(str(path), dim=dim)
return _open_dbs[tenant_id]
# Usage — each tenant is fully isolated
db_user_a = get_db("user_a")
db_user_b = get_db("user_b")
# Memories added to user_a never appear in user_b queries
db_user_a.add(id=1, vec=embed("User A prefers dark mode"), meta=make_meta("User A prefers dark mode"))
results = db_user_b.context_chain(embed("user preferences"), k=5) # won't return user A's data
Within a single tenant's store, you can further isolate sessions by encoding a session prefix in node IDs or using metadata attributes as filters. For most use cases, per-tenant files are sufficient and add zero runtime overhead.
Pattern 2: Importance-weighted ingestion from engagement signals
Setting importance=0.5 for every node is leaving signal on the table. In production, you have engagement data: message likes, explicit confirmations, repeated questions, correction events. These signals should drive importance weights at ingest time.
from dataclasses import dataclass
@dataclass
class EngagementSignals:
was_liked: bool = False
was_copied: bool = False
was_corrected: bool = False
repetition_count: int = 0 # times user asked same question
explicit_save: bool = False
def compute_importance(signals: EngagementSignals) -> float:
"""Map engagement signals to an importance weight in [0, 1]."""
score = 0.4 # baseline
if signals.explicit_save:
score += 0.4 # user explicitly bookmarked it
if signals.was_liked:
score += 0.15
if signals.was_copied:
score += 0.1
if signals.was_corrected:
score -= 0.2 # model was wrong — deprioritize
if signals.repetition_count > 1:
score += min(0.1 * signals.repetition_count, 0.2)
return max(0.0, min(1.0, score))
def store_with_signals(
db: fdb.DB,
node_id: int,
text: str,
signals: EngagementSignals
) -> None:
importance = compute_importance(signals)
meta = fdb.Metadata(importance=importance)
meta.set_attribute("text", text)
meta.set_attribute("importance_reason", str(signals))
db.add(id=node_id, vec=embed(text), meta=meta)
# A user explicitly saved a response — high importance
store_with_signals(
db, node_id=42,
text="Optimal batch size for embedding API is 96 texts per call",
signals=EngagementSignals(explicit_save=True, was_copied=True)
) # importance = 0.4 + 0.4 + 0.1 = 0.9
Pattern 3: Contradiction handling with supersedes edges
Facts change. A user who preferred Python 3.10 may have migrated to 3.12. A tech stack preference stated in January may be obsolete by June. Without explicit contradiction handling, old and new facts coexist in the store and both surface at retrieval time, giving the model conflicting signals.
Feather DB's supersedes edge type handles this. When you store an updated fact, link it to the old fact with a supersedes edge. A traversal that starts from the new fact will surface the supersession relationship; the old fact can be de-weighted or excluded.
import time
def update_fact(
db: fdb.DB,
old_node_id: int,
new_text: str,
new_importance: float = 0.75
) -> int:
"""Store an updated fact and mark it as superseding the old one."""
new_node_id = int(time.time() * 1000) % (2**31)
meta = fdb.Metadata(importance=new_importance)
meta.set_attribute("text", new_text)
meta.set_attribute("supersedes_id", str(old_node_id))
db.add(id=new_node_id, vec=embed(new_text), meta=meta)
# Create the supersedes edge — traversal can detect this relationship
db.link(new_node_id, old_node_id, edge_type="supersedes")
# De-weight the old node so it stops surfacing in top-k
# (Feather DB doesn't support in-place edits, so use a low-importance tombstone)
tombstone_meta = fdb.Metadata(importance=0.01)
tombstone_meta.set_attribute("text", f"[SUPERSEDED] {new_text}")
tombstone_meta.set_attribute("superseded_by", str(new_node_id))
# Note: old_node_id's importance doesn't change in storage —
# track superseded state in metadata and filter in your retrieval layer
return new_node_id
# User updates their Python version preference
old_id = 100 # "User uses Python 3.10"
new_id = update_fact(
db, old_id,
new_text="User migrated to Python 3.12, uses match-case and tomllib"
)
# Now retrieval for "python version" surfaces new_id;
# the supersedes edge makes the relationship explicit in context_chain output
Pattern 4: Multi-tenant architecture with per-agent stores
SaaS products built on context engines typically have three levels of memory: global (product-wide knowledge), team (shared context for a group), and individual (per-user memories). Pattern 4 handles the three-level merge.
class ContextLayer:
"""Three-layer memory: global > team > individual."""
def __init__(self, user_id: str, team_id: str, dim: int = 768):
self.global_db = get_db("__global__", dim)
self.team_db = get_db(f"team_{team_id}", dim)
self.user_db = get_db(f"user_{user_id}", dim)
self.dim = dim
def search(self, query_vec, k: int = 5) -> list:
"""Search all layers, deduplicate, rank by final score."""
global_results = self.global_db.context_chain(
query_vec, k=k, hops=1, time_weight=0.1 # global knowledge decays slowly
)
team_results = self.team_db.context_chain(
query_vec, k=k, hops=2, time_weight=0.25
)
user_results = self.user_db.context_chain(
query_vec, k=k, hops=2, time_weight=0.35 # personal memory most time-sensitive
)
# Merge and rank by score, user context wins on ties
merged = (
[(r, "user", 1.0) for r in user_results] +
[(r, "team", 0.9) for r in team_results] +
[(r, "global", 0.8) for r in global_results]
)
merged.sort(key=lambda x: x[0].score * x[2], reverse=True)
seen_texts = set()
unique = []
for result, layer, _ in merged:
text = result.meta.get_attribute("text") if result.meta else ""
if text and text not in seen_texts:
seen_texts.add(text)
unique.append(result)
if len(unique) >= k:
break
return unique
def add_user(self, node_id: int, vec, meta: fdb.Metadata) -> None:
self.user_db.add(id=node_id, vec=vec, meta=meta)
def add_team(self, node_id: int, vec, meta: fdb.Metadata) -> None:
self.team_db.add(id=node_id, vec=vec, meta=meta)
Pattern 5: Warm startup with parallel HNSW load
Feather DB v0.15+ loads HNSW indexes in parallel using multiple threads, achieving 4.7× faster load times for large stores. For production services that restart frequently (serverless functions, rolling deployments), startup latency matters.
import feather_db as fdb
from concurrent.futures import ThreadPoolExecutor
import time
def warm_start_dbs(tenant_ids: list[str], dim: int = 768) -> dict[str, fdb.DB]:
"""Load multiple tenant DBs in parallel at startup."""
def load_one(tenant_id: str) -> tuple[str, fdb.DB]:
t0 = time.perf_counter()
db = fdb.DB.open(f"/var/data/agent-memories/{tenant_id}.feather", dim=dim)
elapsed = time.perf_counter() - t0
print(f" Loaded {tenant_id}: {elapsed*1000:.1f}ms")
return tenant_id, db
# Parallel load — 4.7x faster than sequential for large stores
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(pool.map(load_one, tenant_ids))
return dict(results)
# At FastAPI startup
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
active_tenants = fetch_active_tenant_ids() # from your DB
app.state.dbs = warm_start_dbs(active_tenants)
print(f"Warmed {len(active_tenants)} tenant stores")
yield
# cleanup on shutdown if needed
app = FastAPI(lifespan=lifespan)
@app.get("/query/{tenant_id}")
async def query(tenant_id: str, q: str):
db = app.state.dbs.get(tenant_id)
if not db:
db = get_db(tenant_id) # cold load for new tenants
results = db.context_chain(embed(q), k=5, hops=2)
return [{"text": r.meta.get_attribute("text"), "score": r.score}
for r in results if r.meta]
For a store with 100K vectors, the parallel HNSW load completes in under 200ms on a standard VM. Sequential loading of the same store takes approximately 940ms. At 10 concurrent tenant loads, the parallel approach finishes in roughly the time of a single sequential load.
Summary
These five patterns — namespace isolation, engagement-driven importance, supersedes-based contradiction handling, three-layer multi-tenant architecture, and parallel warm startup — cover the majority of production concerns that come up when deploying context engines at scale. None of them require changes to the core Feather DB API; they're all patterns built on top of the primitives that are already there.
Install: pip install feather-db · GitHub: github.com/feather-store/feather