Feather DB + CrewAI: Persistent Memory for Multi-Agent Systems
CrewAI runs are stateless by default — each kickoff wipes agent memory. Here's how to wire Feather DB as a persistent memory layer between runs using a custom FeatherMemoryTool, per-role namespaces, decay-aware search, and add_batch() for bulk post-run ingestion.
The problem: CrewAI is stateless by default
CrewAI makes it easy to compose multi-agent pipelines. You define a crew, assign tasks, call crew.kickoff(), and agents collaborate to produce an output. The problem is that every kickoff() starts from scratch. The researcher agent that spent three minutes gathering competitive intelligence last Tuesday has no memory of it today. The writer that learned your brand voice in run 1 treats run 2 as a first encounter.
CrewAI has a built-in memory flag (memory=True) that uses an in-process ChromaDB instance. It works within a single session, but it resets on process restart, doesn't support per-agent namespace isolation, and has no decay model — old facts from run 1 get equal weight to fresh findings from run 100.
Feather DB solves all three: it persists to a .feather file between runs, isolates each agent role into its own namespace, and applies half-life decay so recent findings outrank stale ones automatically.
Setup
pip install feather-db crewai crewai-tools openai
This guide uses OpenAI text-embedding-3-small (1536-dim) for embeddings and gpt-4o as the crew LLM. Any embedding model that produces a fixed-dim float32 vector works — just set dim accordingly when opening the DB.
Step 1: Initialize Feather DB and the embed function
import os
import feather_db as fdb
from openai import OpenAI
os.environ["FEATHER_LOAD_THREADS"] = "8" # v0.16.0: 48ms cold load
openai_client = OpenAI()
# One shared .feather file for the whole crew system.
# All agent roles write to the same file, isolated by namespace.
db = fdb.DB.open("crew_memory.feather", dim=1536)
def embed(text: str) -> list:
"""Embed text using OpenAI text-embedding-3-small."""
response = openai_client.embeddings.create(
input=[text],
model="text-embedding-3-small"
)
return response.data[0].embedding
Setting FEATHER_LOAD_THREADS=8 before the first DB.open() activates the parallel HNSW graph reconstruction introduced in v0.16.0. On a standard 4-core machine, a 10k-vector crew memory file loads in ~48ms instead of ~230ms — fast enough that it doesn't add perceptible latency to kickoff().
Step 2: Build FeatherMemoryTool for CrewAI
CrewAI tools are subclasses of BaseTool. We'll build two tools — FeatherSearchTool and FeatherAddTool — and pair them per agent role. Each tool takes a fixed namespace at construction so the researcher's tool always reads from "researcher", the writer's tool from "writer", and so on.
from crewai.tools import BaseTool
from pydantic import Field
from datetime import datetime
from typing import Optional
class FeatherSearchTool(BaseTool):
"""Search persistent Feather DB memory for relevant context."""
name: str = "search_memory"
description: str = (
"Search this agent's persistent memory for context relevant to the query. "
"Call this at the START of any task to recall prior findings before doing new research."
)
namespace: str = Field(default="default")
k: int = Field(default=6)
half_life: int = Field(default=7) # 7-day half_life for daily research tasks
def _run(self, query: str) -> str:
vec = embed(query)
results = db.context_chain(
vec,
k=self.k,
namespace=self.namespace,
max_depth=2,
half_life=self.half_life
)
if not results:
return "No relevant memories found. This appears to be a fresh topic."
lines = [f"Retrieved {len(results)} memories from namespace '{self.namespace}':"]
for i, mem in enumerate(results, 1):
mem_type = mem.meta.get_attribute("type") or "fact"
importance = mem.meta.get_attribute("importance") or 1.0
created = mem.meta.get_attribute("created_at") or "unknown"
lines.append(
f"{i}. [{mem_type}] (importance={importance}, stored={created[:10]}) {mem.text}"
)
return "\n".join(lines)
class FeatherAddTool(BaseTool):
"""Store a new finding into persistent Feather DB memory."""
name: str = "save_to_memory"
description: str = (
"Save an important finding, decision, or fact to persistent memory. "
"Call this at the END of a task for each key finding. "
"Use memory_type='finding' for research results, 'decision' for choices made, "
"'summary' for synthesized conclusions."
)
namespace: str = Field(default="default")
def _run(
self,
text: str,
memory_type: str = "finding",
importance: float = 1.0,
entity: str = "general",
) -> str:
vec = embed(text)
mem = db.add(
vec,
text=text,
namespace=self.namespace,
entity=entity
)
mem.meta.set_attribute("type", memory_type)
mem.meta.set_attribute("importance", importance)
mem.meta.set_attribute("created_at", datetime.utcnow().isoformat())
mem.meta.set_attribute("entity", entity)
return f"Saved to namespace '{self.namespace}' (id={mem.id}): {text[:100]}..."
Step 3: Namespace per agent role
The key architectural decision: one namespace per agent role, not per run. This means the researcher accumulates findings across every crew run. The writer's tone decisions persist. The analyst's data interpretations carry forward. Runs build on each other instead of restarting from zero.
from crewai import Agent, Task, Crew, Process
# --- Tool instances: each agent gets its own scoped pair ---
researcher_search = FeatherSearchTool(namespace="researcher", k=6, half_life=7)
researcher_add = FeatherAddTool(namespace="researcher")
analyst_search = FeatherSearchTool(namespace="analyst", k=5, half_life=14)
analyst_add = FeatherAddTool(namespace="analyst")
writer_search = FeatherSearchTool(namespace="writer", k=4, half_life=30)
writer_add = FeatherAddTool(namespace="writer")
# half_life notes:
# researcher: 7 days — research findings become stale quickly
# analyst: 14 days — data interpretations hold a bit longer
# writer: 30 days — tone/style decisions are more durable
# --- Agents ---
researcher = Agent(
role="Senior Research Analyst",
goal=(
"Gather comprehensive, up-to-date intelligence on the assigned topic. "
"ALWAYS search memory first — avoid repeating research you've already done."
),
backstory=(
"You are a meticulous research analyst who builds a persistent knowledge base. "
"Before every research session, you check your memory for prior findings. "
"After every session, you save key findings with importance scores."
),
tools=[researcher_search, researcher_add],
verbose=True,
memory=False # disable CrewAI's built-in memory — Feather DB handles this
)
analyst = Agent(
role="Data Analyst",
goal=(
"Analyze research findings and identify patterns, risks, and opportunities. "
"Build on prior analysis stored in memory rather than starting fresh."
),
backstory=(
"You are a systematic analyst who tracks analytical conclusions over time. "
"You search memory for prior interpretations before forming new ones."
),
tools=[analyst_search, analyst_add],
verbose=True,
memory=False
)
writer = Agent(
role="Content Strategist",
goal=(
"Synthesize research and analysis into clear, actionable deliverables. "
"Recall your prior writing style decisions and audience insights from memory."
),
backstory=(
"You are a precise communicator who maintains consistent voice across runs. "
"Your memory holds tone decisions, audience insights, and approved structures."
),
tools=[writer_search, writer_add],
verbose=True,
memory=False
)
Step 4: Define tasks with memory instructions baked in
def build_tasks(topic: str) -> list:
"""Build task list for a given research topic."""
research_task = Task(
description=f"""
Research the following topic thoroughly: {topic}
STEP 1 — Search memory first:
Call search_memory with the topic to see what you already know.
If you have recent findings (< 7 days old), identify gaps only.
Do NOT re-research what is already in memory.
STEP 2 — Fill gaps:
Research only the aspects not already covered in memory.
Focus on recent developments, statistics, and expert positions.
STEP 3 — Save findings:
For each key finding, call save_to_memory:
- memory_type='finding'
- importance=2.0 for headline findings, 1.0 for supporting details
- entity=
""",
expected_output=(
"A structured research summary with 5-8 key findings. "
"Clearly mark which findings came from memory vs new research."
),
agent=researcher
)
analysis_task = Task(
description=f"""
Analyze the research findings about: {topic}
STEP 1 — Search your memory:
Call search_memory for prior analytical conclusions on this topic.
Note any patterns or interpretations you've previously identified.
STEP 2 — Analyze:
Identify: key trends, contradictions, risks, and opportunities.
Build on prior analytical work rather than duplicating it.
STEP 3 — Save conclusions:
Save each significant conclusion with save_to_memory:
- memory_type='decision' for interpretive conclusions
- importance=1.5 for significant patterns
""",
expected_output=(
"An analytical report with trends, risks, and 3 strategic recommendations. "
"Note which conclusions build on prior analysis."
),
agent=analyst,
context=[research_task]
)
writing_task = Task(
description=f"""
Write a concise briefing document on: {topic}
STEP 1 — Search memory for style context:
Call search_memory with 'writing style audience format briefing' to recall
any tone decisions, audience specs, or approved formats from prior runs.
STEP 2 — Write the briefing:
- Executive Summary (2-3 sentences)
- Key Findings (3-5 bullets)
- Strategic Implications (2-3 bullets)
- Recommended Actions (3 bullets)
STEP 3 — Save meta-decisions:
If you make significant style or structure decisions, save them with:
- memory_type='decision', importance=1.2, entity='writing-meta'
""",
expected_output="A polished briefing document, ready to share with stakeholders.",
agent=writer,
context=[research_task, analysis_task]
)
return [research_task, analysis_task, writing_task]
Step 5: The crew kickoff with post-run memory flush
def run_research_crew(topic: str) -> str:
"""
Kick off the research crew and flush any unsaved findings to Feather DB.
"""
print(f"\n{'='*60}")
print(f"CREW RUN: {topic}")
print(f"Memory counts before run:")
print(f" researcher: {db.count(namespace='researcher')} memories")
print(f" analyst: {db.count(namespace='analyst')} memories")
print(f" writer: {db.count(namespace='writer')} memories")
print(f"{'='*60}\n")
crew = Crew(
agents=[researcher, analyst, writer],
tasks=build_tasks(topic),
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
# Persist to disk — agents may have added memories during the run
db.save()
print(f"\nMemory counts after run:")
print(f" researcher: {db.count(namespace='researcher')} memories")
print(f" analyst: {db.count(namespace='analyst')} memories")
print(f" writer: {db.count(namespace='writer')} memories")
return result.raw
# Run 1 — agents research from scratch and build memory
output1 = run_research_crew("AI memory architectures for production agents in 2026")
print(output1)
print("\n" + "="*60)
print("RUN 2 — agents recall prior findings and build on them")
print("="*60 + "\n")
# Run 2 — same topic: agents recall prior findings, fill gaps only
output2 = run_research_crew("AI memory architectures: new developments and competitive landscape")
Using add_batch() for bulk memory ingestion after a crew run
Sometimes a crew run produces a large structured output — a research report, a competitive analysis, a dataset of findings — that you want to ingest wholesale into memory rather than having agents call save_to_memory one fact at a time. add_batch() handles this in a single parallel call.
import numpy as np
from datetime import datetime
def ingest_report_to_memory(
findings: list[dict],
namespace: str,
entity: str = "bulk-import"
) -> int:
"""
Bulk-ingest a list of findings into Feather DB after a crew run.
Each finding: {"text": str, "type": str, "importance": float}
Uses add_batch() for 3.4x faster ingestion vs sequential add().
Returns the number of memories added.
"""
if not findings:
return 0
texts = [f["text"] for f in findings]
types = [f.get("type", "finding") for f in findings]
importances = [float(f.get("importance", 1.0)) for f in findings]
# Batch embed — one API call for all texts
response = openai_client.embeddings.create(
input=texts,
model="text-embedding-3-small"
)
vecs = np.array([r.embedding for r in response.data], dtype=np.float32)
# Build metadata objects
metas = []
now = datetime.utcnow().isoformat()
for i, (t, imp) in enumerate(zip(types, importances)):
m = fdb.Metadata(importance=imp)
m.set_attribute("type", t)
m.set_attribute("created_at", now)
m.set_attribute("entity", entity)
m.set_attribute("text", texts[i])
metas.append(m)
# Parallel batch insert — GIL released during HNSW construction
ids = list(range(db.count(namespace=namespace), db.count(namespace=namespace) + len(texts)))
db.add_batch(ids, vecs, metas=metas)
db.save()
return len(findings)
# After a crew run, ingest the structured output in bulk
research_findings = [
{"text": "Feather DB achieves 0.19ms P50 search at 500K vectors (SIFT1M benchmark).",
"type": "finding", "importance": 2.0},
{"text": "In-process vector stores like Chroma reset on process restart — no cross-run persistence.",
"type": "finding", "importance": 1.5},
{"text": "Namespace isolation allows one .feather file to serve multiple agent roles without bleed.",
"type": "finding", "importance": 1.8},
{"text": "add_batch() is 3.4x faster than sequential add() for bulk ingestion on 4-core machines.",
"type": "finding", "importance": 1.6},
{"text": "half_life=7 is appropriate for daily research tasks where findings age out in a week.",
"type": "decision", "importance": 1.2},
]
n = ingest_report_to_memory(research_findings, namespace="researcher", entity="ai-memory-2026")
print(f"Ingested {n} findings via add_batch()")
v0.16.0: fast cold load so memory doesn't slow down crew startup
CrewAI agents are typically instantiated inside a script that runs as a subprocess or a scheduled job. Every kickoff is a cold Python process. That means Feather DB gets a cold start on every crew run — and if the load takes 2 seconds, it adds 2 seconds of latency before the first agent even reads its first task.
v0.16.0's parallel HNSW load, combined with the file-format optimizations from v0.15, brings cold load time to ~48ms for a typical crew memory file (5k–15k vectors per namespace):
import os
import time
import feather_db as fdb
# Set before DB.open() — activates parallel graph reconstruction
os.environ["FEATHER_LOAD_THREADS"] = "8"
t0 = time.perf_counter()
db = fdb.DB.open("crew_memory.feather", dim=1536)
elapsed_ms = (time.perf_counter() - t0) * 1000
print(f"Cold load: {elapsed_ms:.1f}ms")
print(f"Researcher memories: {db.count(namespace='researcher')}")
print(f"Analyst memories: {db.count(namespace='analyst')}")
print(f"Writer memories: {db.count(namespace='writer')}")
# Cold load: 48ms
# Researcher memories: 142
# Analyst memories: 67
# Writer memories: 31
For crews running as Lambda functions or Cloud Run jobs — where cold starts happen on every invocation — 48ms is invisible. 2 seconds is not.
Production tips: compact(), multiple crews, and shared namespaces
When to compact()
Every db.add() and db.add_batch() call appends to the HNSW graph. Deleted memories leave tombstones. After many runs, the file accumulates fragmentation — logical deletions, superseded facts marked with low importance, and orphaned edges. compact() rewrites the file and rebuilds the graph from live entries only:
import feather_db as fdb
db = fdb.DB.open("crew_memory.feather", dim=1536)
# Check fragmentation before deciding to compact
total = db.count()
live = db.count(min_importance=0.1) # entries with non-tombstone importance
fragmentation_ratio = 1 - (live / total) if total > 0 else 0
print(f"Total entries (incl. tombstones): {total}")
print(f"Live entries: {live}")
print(f"Fragmentation: {fragmentation_ratio:.1%}")
# Compact when fragmentation > 20% or after every ~50 crew runs
if fragmentation_ratio > 0.20:
print("Compacting...")
db.compact() # in-place rewrite — no downtime, file temporarily larger during rewrite
db.save()
print(f"After compact: {db.count()} entries")
A good schedule: run compact() as a weekly maintenance job (not per-kickoff), or after a bulk deletion pass where you've removed memories older than a threshold.
Multiple crews sharing a namespace
If you run multiple crews that work on related topics — a daily news crew, a weekly synthesis crew, a monthly strategy crew — they can share namespaces intentionally. The news crew writes to "researcher", the synthesis crew reads from "researcher" to build on daily findings. This is the same one-file-multiple-readers pattern used in multi-agent OpenAI SDK setups.
# crew_a: daily news researcher — writes to "researcher" namespace
# crew_b: weekly synthesizer — reads from "researcher", writes to "synthesis"
# crew_b's analyst reads from the researcher namespace for accumulated daily findings
weekly_analyst_search = FeatherSearchTool(
namespace="researcher", # read from daily crew's findings
k=10,
half_life=7 # surface the freshest findings
)
weekly_analyst_add = FeatherAddTool(
namespace="synthesis" # write its own conclusions separately
)
weekly_analyst = Agent(
role="Weekly Synthesis Analyst",
goal="Synthesize the week's research findings into strategic insights.",
tools=[weekly_analyst_search, weekly_analyst_add],
verbose=True,
memory=False
)
For concurrent write safety — two crew processes writing to the same file simultaneously — use feather-serve as a single server process and point both crews at its REST API instead of opening the file directly from each process. That eliminates write contention entirely.
# Start feather-serve as the memory server
feather-serve --file crew_memory.feather --port 7532 --embed-provider openai
# Both crew processes talk to the same server — no file locking required
Evicting stale memories between crew runs
After many runs, the researcher namespace may fill with outdated findings — a benchmark number from three months ago, a product feature that has since been deprecated. The adaptive scoring formula deprioritizes these via half_life decay, but if you want to remove them explicitly:
from datetime import datetime, timedelta
def evict_stale_memories(namespace: str, max_age_days: int = 30,
min_importance_to_keep: float = 1.5):
"""
Remove memories older than max_age_days that are below the importance threshold.
High-importance memories (> min_importance_to_keep) are kept regardless of age.
Run this before a crew kickoff, not during.
"""
cutoff = datetime.utcnow() - timedelta(days=max_age_days)
probe_vec = embed("research finding fact")
candidates = db.search(probe_vec, k=500, namespace=namespace, half_life=1)
evicted = 0
for mem in candidates:
created_str = mem.meta.get_attribute("created_at")
importance = float(mem.meta.get_attribute("importance") or 1.0)
if not created_str:
continue
try:
created = datetime.fromisoformat(created_str)
except ValueError:
continue
if created < cutoff and importance < min_importance_to_keep:
db.delete(mem.id)
evicted += 1
if evicted:
db.save()
return evicted
# Run before weekly crews to keep the namespace clean
n = evict_stale_memories("researcher", max_age_days=30, min_importance_to_keep=1.5)
print(f"Evicted {n} stale memories from researcher namespace")
Full working example
"""
feather_crewai_example.py — complete working example.
Run: python feather_crewai_example.py
"""
import os
import numpy as np
import feather_db as fdb
from openai import OpenAI
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from pydantic import Field
from datetime import datetime
os.environ["FEATHER_LOAD_THREADS"] = "8"
openai_client = OpenAI()
db = fdb.DB.open("crew_memory.feather", dim=1536)
def embed(text: str) -> list:
r = openai_client.embeddings.create(input=[text], model="text-embedding-3-small")
return r.data[0].embedding
class FeatherSearchTool(BaseTool):
name: str = "search_memory"
description: str = "Search this agent's persistent memory for prior findings."
namespace: str = Field(default="default")
k: int = Field(default=6)
half_life: int = Field(default=7)
def _run(self, query: str) -> str:
results = db.context_chain(embed(query), k=self.k,
namespace=self.namespace,
max_depth=2, half_life=self.half_life)
if not results:
return "No relevant memories found."
lines = [f"Found {len(results)} memories:"]
for i, m in enumerate(results, 1):
lines.append(f"{i}. [{m.meta.get_attribute('type') or 'fact'}] {m.text}")
return "\n".join(lines)
class FeatherAddTool(BaseTool):
name: str = "save_to_memory"
description: str = "Save a key finding to persistent memory."
namespace: str = Field(default="default")
def _run(self, text: str, memory_type: str = "finding",
importance: float = 1.0, entity: str = "general") -> str:
vec = embed(text)
mem = db.add(vec, text=text, namespace=self.namespace, entity=entity)
mem.meta.set_attribute("type", memory_type)
mem.meta.set_attribute("importance", importance)
mem.meta.set_attribute("created_at", datetime.utcnow().isoformat())
return f"Saved (id={mem.id}): {text[:80]}..."
researcher = Agent(
role="Senior Research Analyst",
goal="Research topics thoroughly, recall prior work from memory first.",
backstory="You build a persistent knowledge base. Always check memory before researching.",
tools=[
FeatherSearchTool(namespace="researcher", k=6, half_life=7),
FeatherAddTool(namespace="researcher")
],
verbose=True,
memory=False
)
analyst = Agent(
role="Data Analyst",
goal="Analyze findings, build on prior analytical conclusions from memory.",
backstory="You track interpretations over time. Search memory before re-analyzing.",
tools=[
FeatherSearchTool(namespace="analyst", k=5, half_life=14),
FeatherAddTool(namespace="analyst")
],
verbose=True,
memory=False
)
writer = Agent(
role="Content Strategist",
goal="Write clear briefings. Recall tone and format decisions from memory.",
backstory="You maintain consistent voice across runs using persistent memory.",
tools=[
FeatherSearchTool(namespace="writer", k=4, half_life=30),
FeatherAddTool(namespace="writer")
],
verbose=True,
memory=False
)
def run_crew(topic: str) -> str:
tasks = [
Task(
description=(
f"Research: {topic}. "
"Step 1: search_memory with the topic. "
"Step 2: only research gaps not in memory. "
"Step 3: save each finding with save_to_memory (importance=2.0 for headlines)."
),
expected_output="5-8 key findings, noting what came from memory vs new research.",
agent=researcher
),
Task(
description=(
f"Analyze findings about: {topic}. "
"Step 1: search_memory for prior analytical conclusions. "
"Step 2: identify trends, risks, opportunities. "
"Step 3: save conclusions with save_to_memory (memory_type='decision')."
),
expected_output="Analytical report with 3 strategic recommendations.",
agent=analyst
),
Task(
description=(
f"Write a stakeholder briefing on: {topic}. "
"Step 1: search_memory for 'writing style audience format'. "
"Step 2: write Executive Summary, Key Findings, Implications, Actions. "
"Step 3: save significant style decisions with save_to_memory."
),
expected_output="Polished briefing document ready to share.",
agent=writer
)
]
crew = Crew(agents=[researcher, analyst, writer],
tasks=tasks, process=Process.sequential, verbose=True)
result = crew.kickoff()
db.save() # flush all agent writes to disk
return result.raw
if __name__ == "__main__":
# Run 1 — cold start, agents build memory
print(run_crew("persistent memory architectures for AI agents"))
# Run 2 — agents recall prior findings, only fill gaps
print(run_crew("latest benchmarks in AI agent memory systems"))
After two runs, the .feather file holds the accumulated knowledge of all three agent roles. Run 3 is faster — not because the model is smarter, but because the researcher namespace already has the foundational findings and the agents skip redundant work. That compounding is the point.
Install: pip install feather-db crewai crewai-tools openai · GitHub: github.com/feather-store/feather