Get Started
Docs Edge Cases & Advanced

Last updated: 3/7/2026

Edge Cases & Advanced

How does AI memory work in streaming response architectures?

Streaming responses — where the LLM outputs tokens progressively rather than returning a complete response at once — require careful integration with memory operations. The core constraint: you cannot store a memory until you have the complete assistant response, but you do not want to delay stream start by running memory retrieval after the stream begins.

The timing pattern

Memory operations bracket the stream:

  1. Before streaming starts: retrieve relevant memories, inject into system prompt. This adds ~20ms before the first token but does not affect streaming throughput.
  2. During streaming: accumulate the full response text as tokens arrive.
  3. After stream completes: pass the complete (user_message, full_response) pair to the memory storage pipeline asynchronously.
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory

client = AsyncOpenAI()
memory = AsyncMemory()

async def stream_chat(user_id: str, user_message: str):
    # Step 1: Retrieve memories BEFORE streaming
    memories = await memory.search(user_message, user_id=user_id, limit=5)
    context = "\n".join([m["memory"] for m in memories["results"]])

    messages = [
        {"role": "system", "content": f"Context about this user:\n{context}"},
        {"role": "user", "content": user_message},
    ]

    # Step 2: Stream response, accumulate full text
    full_response = ""
    async with client.chat.completions.stream(model="gpt-4o", messages=messages) as stream:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                full_response += token
                yield token  # Stream to client in real time

    # Step 3: Store AFTER stream completes — non-blocking
    asyncio.create_task(
        memory.add([
            {"role": "user", "content": user_message},
            {"role": "assistant", "content": full_response},
        ], user_id=user_id)
    )

FastAPI streaming endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/{user_id}")
async def chat_endpoint(user_id: str, body: dict):
    async def generate():
        async for token in stream_chat(user_id, body["message"]):
            yield f"data: {token}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Handling stream interruptions

If a user closes the connection mid-stream, the memory storage task may receive an incomplete response. Only complete exchanges produce reliable memories — wrap the storage call in a try/except and discard incomplete responses rather than storing partial content that could contaminate the memory store.

Ready to add memory to your AI?

Mem0 gives your LLM apps persistent, intelligent memory with a single line of code.

Get Started with Mem0 →