Feather DB + LangGraph: Agent Memory Across Graph Runs
LangGraph checkpoints let you replay a run. Feather DB gives your graph semantic memory — find relevant past context by meaning, not position. Here's how to wire them together with FeatherMemoryNode as a first-class subgraph node.
Feather DB + LangGraph: Agent Memory Across Graph Runs
Tutorial · LangGraph 0.2+ · Feather DB v0.16.0 · June 2026
The Gap in LangGraph's Persistence Model
LangGraph ships with a solid persistence story. The MemorySaver checkpointer serializes your graph's full state dict after every node execution. You get replay: given a thread_id, you can resume an interrupted run or rewind to any checkpoint. That's useful for debugging and for long-running workflows that must survive restarts.
What checkpoints don't give you is semantic recall across runs. A checkpoint is a snapshot of a specific run's state. It doesn't let you ask: "what did this agent learn about pricing strategy across the last 40 conversations?" You can't query a checkpoint by meaning. You can only replay it by position.
The gap looks like this:
LangGraph checkpoint store
thread_id=abc123 → [state_t0, state_t1, state_t2, ...] ← replay by position
thread_id=def456 → [state_t0, state_t1, ...]
What's missing:
"find everything relevant to 'pricing objections'" → ??? across all threads, all time
Feather DB fills that gap. It sits alongside LangGraph's checkpointer — not replacing it — and adds a semantic memory layer that persists across runs, users, and sessions. The two systems are complementary: checkpoints for replay, Feather for recall.
What Feather Adds: Semantic Memory, Not Replay
Feather DB is an embedded vector database with adaptive decay scoring. Every insight your agent produces can be stored as a vector. At the start of the next run, a semantic search surfaces the most relevant past context — regardless of which thread generated it or how long ago it was stored.
Three properties make this useful in a LangGraph context:
- Adaptive decay. Memories that get retrieved repeatedly stay sharp. Memories that stop being relevant fade. No manual curation — the retrieval pattern becomes the memory signal.
- Metadata filters. Scope memory per user, per session, or per topic with
filter_attributes. One.featherfile can serve many tenants safely. - Fast cold start. Parallel HNSW load (
FEATHER_LOAD_THREADS=8) brings a 40K-vector index online in under 50ms — fast enough for serverless node execution.
Integration Pattern: FeatherMemoryNode
The cleanest integration pattern treats Feather DB as two nodes in your StateGraph: a read node at the top of the graph and a write node at the bottom. Together they form a closed memory loop around every run.
┌────────────────────────────────────────────┐
│ StateGraph │
│ │
│ [memory_read] ─→ [agent] ─→ [memory_write]│
│ ↑ │ │
│ └──── Feather DB ────────────┘ │
├────────────────────────────────────────────┤
│ LangGraph MemorySaver (checkpoints) │
│ thread_id: replay by position │
├────────────────────────────────────────────┤
│ agent.feather (semantic recall) │
└────────────────────────────────────────────┘
The state carries a memory_context field that memory_read populates. Every downstream node can read it. memory_write stores the final agent output back to Feather, closing the loop.
Complete Working Example
Install
pip install feather-db langgraph langchain-openai
State definition
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
class AgentState(TypedDict):
# User input for this run
user_query: str
# Feather DB populates this at the start of each run
memory_context: str
# The agent's final response
response: str
# Metadata for scoping memory (user_id, session_id, etc.)
user_id: str
Feather DB setup
import os
import feather_db as fdb
import numpy as np
from openai import OpenAI
# Parallel HNSW load — 48ms cold start on 40K vectors (v0.16.0)
os.environ["FEATHER_LOAD_THREADS"] = "8"
openai_client = OpenAI()
def embed(text: str) -> np.ndarray:
resp = openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(resp.data[0].embedding, dtype=np.float32)
# One file for all agent memory — scoped per user via metadata filters
db = fdb.DB.open("agent_memory.feather", dim=1536)
Memory read node
def memory_read_node(state: AgentState) -> dict:
"""Retrieve semantically relevant past context at the start of each run."""
query_vec = embed(state["user_query"])
user_id = state.get("user_id", "default")
# Scope to this user's memories with metadata filter
results = db.search(
query_vec,
k=5,
filter_attributes={"user_id": user_id}
)
if not results:
return {"memory_context": ""}
# Format retrieved memories into a context block
context_parts = []
for i, r in enumerate(results, 1):
text = r.metadata.get_attribute("text")
score = r.score
context_parts.append(f"[Memory {i} | relevance={score:.3f}]\n{text}")
memory_context = "\n\n".join(context_parts)
return {"memory_context": memory_context}
Agent node
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def agent_node(state: AgentState) -> dict:
"""Core agent reasoning — receives past context from Feather."""
system_prompt = "You are a helpful assistant with access to relevant past context."
messages = [{"role": "system", "content": system_prompt}]
# Inject semantic memory from Feather if available
if state.get("memory_context"):
messages.append({
"role": "system",
"content": f"Relevant past context:\n\n{state['memory_context']}"
})
messages.append({"role": "user", "content": state["user_query"]})
response = llm.invoke(messages)
return {"response": response.content}
Memory write node
import time
_next_id = int(time.time() * 1000) # simple monotonic ID
def memory_write_node(state: AgentState) -> dict:
"""Store the agent's response as a new memory in Feather DB."""
global _next_id
response_text = state["response"]
user_id = state.get("user_id", "default")
query = state["user_query"]
# Store the (query, response) pair as a memory unit
memory_text = f"Q: {query}\nA: {response_text}"
vec = embed(memory_text)
meta = fdb.Metadata(importance=0.7)
meta.set_attribute("text", memory_text)
meta.set_attribute("user_id", user_id)
meta.set_attribute("kind", "agent_turn")
meta.set_attribute("timestamp", str(int(time.time())))
_next_id += 1
db.add(id=_next_id, vec=vec, metadata=meta)
db.save()
return {} # no state update — write is a side effect
Wiring the graph
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
# Build the graph
builder = StateGraph(AgentState)
builder.add_node("memory_read", memory_read_node)
builder.add_node("agent", agent_node)
builder.add_node("memory_write", memory_write_node)
# Linear flow: read → agent → write
builder.set_entry_point("memory_read")
builder.add_edge("memory_read", "agent")
builder.add_edge("agent", "memory_write")
builder.add_edge("memory_write", END)
# LangGraph checkpointer for replay — runs alongside Feather
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
Running the graph
config = {
"configurable": {
"thread_id": "user-alice-session-1" # LangGraph checkpoint key
}
}
result = graph.invoke(
{
"user_query": "What's our current pricing strategy for enterprise deals?",
"user_id": "alice",
"memory_context": "",
"response": ""
},
config=config
)
print(result["response"])
On the first run, memory_context will be empty. On subsequent runs — across different sessions, different thread_ids — Feather surfaces past turns that are semantically relevant to the new query. LangGraph's MemorySaver handles replay within a thread; Feather handles recall across threads.
Using Metadata Filters to Scope Memory Per User
A single .feather file can store memories for many users. The filter keeps retrieval scoped:
# Only Alice's memories
results = db.search(
query_vec,
k=5,
filter_attributes={"user_id": "alice"}
)
# Scope to a specific session
results = db.search(
query_vec,
k=5,
filter_attributes={"user_id": "alice", "session_id": "q3-planning"}
)
# Scope to a topic tag
results = db.search(
query_vec,
k=5,
filter_attributes={"user_id": "alice", "kind": "pricing_insight"}
)
Filter attributes are exact-match AND conditions applied before scoring. They don't touch recall — only pre-filter the candidate set before HNSW traversal. Zero overhead on unfiltered recall@10 (97.2%).
Adaptive Decay for Time-Sensitive State
Not all agent memory should age the same way. A short-term planning note from last Tuesday should fade faster than a core product insight from six months ago. Feather's decay formula handles this with per-query half_life control:
import feather_db as fdb
# Short-term plans: half-life of 7 days
# After 7 days, a plan that hasn't been recalled sits at 50% of its peak score
short_term_cfg = fdb.ScoringConfig(half_life=7.0, weight=0.4, min=0.0)
recent_plans = db.search(
query_vec,
k=3,
filter_attributes={"user_id": user_id, "kind": "short_term_plan"},
scoring=short_term_cfg
)
# Long-term insights: half-life of 60 days
long_term_cfg = fdb.ScoringConfig(half_life=60.0, weight=0.2, min=0.0)
durable_insights = db.search(
query_vec,
k=5,
filter_attributes={"user_id": user_id, "kind": "strategic_insight"},
scoring=long_term_cfg
)
The decay formula from include/scoring.h:
stickiness = 1 + log(1 + recall_count)
effective_age = age_in_days / stickiness
recency = 0.5 ^ (effective_age / half_life_days)
final_score = ((1 - time_weight) × similarity + time_weight × recency) × importance
A short-term plan recalled 5 times (stickiness = 2.79) ages at 36% of normal rate — it stays sharp during the window when it matters, then fades once retrieval stops reinforcing it. No manual expiration logic.
Combining LangGraph Checkpoints with Feather Recall
The two systems solve different problems. The right mental model:
| Capability | LangGraph MemorySaver | Feather DB |
|---|---|---|
| Replay a specific run | Yes — full state snapshot | No |
| Resume interrupted run | Yes — resume from checkpoint | No |
| Find relevant past context by meaning | No | Yes — semantic search |
| Memory across different thread_ids | No | Yes — cross-thread recall |
| Memory that evolves with use | No | Yes — adaptive decay + stickiness |
| Per-user / per-tenant isolation | Via thread_id convention | Via metadata filter_attributes |
In production you'll want both. Use MemorySaver (or a SqliteSaver / PostgresSaver) for checkpoint durability and run recovery. Use Feather for the semantic layer that makes each new run informed by everything the agent has learned before.
Production: Seeding Memory with add_batch()
If you're deploying an agent with a history of past conversations, don't loop over them with individual db.add() calls. Use add_batch(), which releases the GIL and builds the HNSW graph in parallel — 3.4× faster than sequential on a 4-core machine, 5–6× on 8 cores.
import feather_db as fdb
import numpy as np
os.environ["FEATHER_LOAD_THREADS"] = "8" # parallel cold-start load
db = fdb.DB.open("agent_memory.feather", dim=1536)
# Load historical conversations from your data store
history = load_conversation_history() # returns list of dicts
# Embed all turns in one batch call to your embedding API
texts = [f"Q: {h['query']}\nA: {h['response']}" for h in history]
vecs_list = embed_batch(texts) # your batched embed function
vecs = np.array(vecs_list, dtype=np.float32)
# Build metadata
metas = []
for h in history:
m = fdb.Metadata(importance=0.7)
m.set_attribute("text", f"Q: {h['query']}\nA: {h['response']}")
m.set_attribute("user_id", h["user_id"])
m.set_attribute("kind", "agent_turn")
m.set_attribute("timestamp", str(h["timestamp"]))
metas.append(m)
ids = list(range(len(history)))
# Single parallel call — 3.4× faster than a loop over db.add()
db.add_batch(ids, vecs, metas=metas)
db.save()
print(f"Seeded {len(history)} memories into agent_memory.feather")
At 50k turns × 1536-dim, add_batch() completes in ~10s on a 4-core machine. The subsequent DB.open() with FEATHER_LOAD_THREADS=8 loads that index in under 2s. Serverless cold start on a 40K-vector index: 48ms (v0.16.0 parallel HNSW load).
Production-Ready Graph
Here's the full pattern with persistent SQLite checkpointing (for production durability) and batch memory seeding:
import os
import time
import numpy as np
import feather_db as fdb
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from typing import TypedDict
# -- Config --
os.environ["FEATHER_LOAD_THREADS"] = "8"
FEATHER_PATH = "agent_memory.feather"
SQLITE_PATH = "checkpoints.sqlite"
DIM = 1536
openai_client = OpenAI()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def embed(text: str) -> np.ndarray:
resp = openai_client.embeddings.create(model="text-embedding-3-small", input=text)
return np.array(resp.data[0].embedding, dtype=np.float32)
db = fdb.DB.open(FEATHER_PATH, dim=DIM)
_id_counter = [int(time.time() * 1000)]
# -- State --
class AgentState(TypedDict):
user_query: str
user_id: str
memory_context: str
response: str
# -- Nodes --
def memory_read_node(state: AgentState) -> dict:
vec = embed(state["user_query"])
results = db.search(vec, k=5, filter_attributes={"user_id": state["user_id"]})
if not results:
return {"memory_context": ""}
parts = [
f"[Memory {i} | score={r.score:.3f}]\n{r.metadata.get_attribute('text')}"
for i, r in enumerate(results, 1)
]
return {"memory_context": "\n\n".join(parts)}
def agent_node(state: AgentState) -> dict:
msgs = [{"role": "system", "content": "You are a helpful assistant."}]
if state.get("memory_context"):
msgs.append({
"role": "system",
"content": f"Relevant past context:\n\n{state['memory_context']}"
})
msgs.append({"role": "user", "content": state["user_query"]})
return {"response": llm.invoke(msgs).content}
def memory_write_node(state: AgentState) -> dict:
text = f"Q: {state['user_query']}\nA: {state['response']}"
vec = embed(text)
meta = fdb.Metadata(importance=0.7)
meta.set_attribute("text", text)
meta.set_attribute("user_id", state["user_id"])
meta.set_attribute("kind", "agent_turn")
meta.set_attribute("timestamp", str(int(time.time())))
_id_counter[0] += 1
db.add(id=_id_counter[0], vec=vec, metadata=meta)
db.save()
return {}
# -- Graph --
builder = StateGraph(AgentState)
builder.add_node("memory_read", memory_read_node)
builder.add_node("agent", agent_node)
builder.add_node("memory_write", memory_write_node)
builder.set_entry_point("memory_read")
builder.add_edge("memory_read", "agent")
builder.add_edge("agent", "memory_write")
builder.add_edge("memory_write", END)
checkpointer = SqliteSaver.from_conn_string(SQLITE_PATH)
graph = builder.compile(checkpointer=checkpointer)
# -- Invoke --
result = graph.invoke(
{"user_query": "Summarise our Q2 pricing decisions", "user_id": "alice",
"memory_context": "", "response": ""},
config={"configurable": {"thread_id": "alice-q2-review"}}
)
print(result["response"])
What You Get
With this pattern in place:
- Every graph run starts informed by semantically relevant past context — not just the last turn, but anything relevant across all prior runs.
- Memory that gets retrieved repeatedly stays sharp via adaptive decay. Memory that stops being relevant fades — no manual cleanup.
- Short-term plans age out in 7 days. Strategic insights persist for 60. You set the half-life per query.
- LangGraph checkpoints still handle replay and run recovery. Feather handles the semantic layer that checkpoints can't.
add_batch()seeds production history in a single parallel call. Parallel HNSW load keeps cold starts under 50ms.
The .feather file lives alongside your graph. No infrastructure to provision. The agent's accumulated knowledge ships with it.
Feather DB — github.com/feather-store/feather · pip install feather-db
Related: LangChain + LlamaIndex integration · add_batch() deep dive · Parallel HNSW load