DH
12 min read

Streaming LLM Responses from FastAPI to Next.js with the OpenAI and Anthropic APIs

End-to-end streaming setup for OpenAI and Anthropic APIs across FastAPI and Next.js. Cover backend generators, SDK differences, error handling, and production patterns.

fastapinextjsai

Streaming LLM Responses from FastAPI to Next.js with the OpenAI and Anthropic APIs

If you've built anything with LLMs recently, you know the problem: waiting three to eight seconds for a complete response before rendering anything feels broken. Users assume it's crashed. Streaming fixes that — the first tokens appear almost immediately, and the interface feels alive. Getting it working end-to-end across FastAPI and Next.js is straightforward once you understand the plumbing, but a few spots quietly break. This tutorial covers the full path: backend generator setup, SDK-level differences between OpenAI and Anthropic, Next.js consumption patterns, error handling, and production gotchas.

The Architecture in One Sentence

FastAPI produces a StreamingResponse using an async generator; Next.js consumes it via the fetch API reading from response.body as a ReadableStream. Both OpenAI and Anthropic support streaming natively — the differences are minor and will be called out explicitly.

Prerequisites

  • Python 3.9+ with fastapi, uvicorn, openai, and anthropic installed
  • Node.js 18+ (for native fetch and ReadableStream)
  • Next.js 13+ (App Router)
  • OPENAI_API_KEY and ANTHROPIC_API_KEY set as environment variables

FastAPI: Setting Up the Streaming Endpoint

Install your dependencies first:

pip install fastapi uvicorn openai anthropic

Here's the core pattern. OpenAI first, then the Anthropic variant, then a shared router that exposes both.

OpenAI Streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI() # picks up OPENAI_API_KEY from env automatically

async def openai_stream(prompt: str):
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta: # delta is None on the final chunk
yield delta

@app.post("/stream/openai")
async def stream_openai(body: dict):
return StreamingResponse(
openai_stream(body["prompt"]),
media_type="text/plain",
)

The key detail: yield delta emits raw text chunks. You could format these as Server-Sent Events (text/event-stream) — and should in production (see the SSE section below) — but for a simple chat interface, plain text/plain with chunked transfer encoding is less ceremony to start with.

Why if delta matters. The OpenAI SDK emits a final chunk where chunk.choices[0].delta.content is None (signalling the end of generation). Without the guard, yield None causes FastAPI to serialise "None" as a string into the response body — a subtle, hard-to-spot bug.

Anthropic Streaming

from anthropic import AsyncAnthropic

anthropic_client = AsyncAnthropic() # picks up ANTHROPIC_API_KEY from env

async def anthropic_stream(prompt: str):
async with anthropic_client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text

@app.post("/stream/anthropic")
async def stream_anthropic(body: dict):
return StreamingResponse(
anthropic_stream(body["prompt"]),
media_type="text/plain",
)

Anthropic's SDK exposes .text_stream on the stream context manager, which iterates directly over decoded text strings — so you never need to inspect event types or unwrap delta objects yourself. This is confirmed in the official Anthropic Python SDK docs: calling for text in stream.text_stream (sync) or async for text in stream.text_stream (async) is the idiomatic path. With OpenAI, you pull out chunk.choices[0].delta.content manually and guard for None; the Anthropic helper removes that ceremony.

max_tokens is required for Anthropic. Unlike OpenAI (where it's optional), the Anthropic API will return a validation error if max_tokens is omitted. Set it to a reasonable upper bound for your use case.

CORS

If your Next.js dev server runs on a different origin, add this before your routes:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["POST"],
allow_headers=["Content-Type"],
)

In production, replace "http://localhost:3000" with your actual frontend domain. Avoid allow_origins=["*"] unless the endpoint is truly public and unauthenticated.

SSE vs Plain Text {#sse-vs-plain-text}

The examples above use media_type="text/plain". That works for a simple demo, but in production you should use Server-Sent Events (text/event-stream) for two reasons:

  1. Error signalling mid-stream. With plain text you cannot distinguish an error chunk from a content chunk once the stream has started and HTTP 200 has been sent. SSE lets you send a named event: error with a structured payload.
  2. Reconnection and event IDs. Browsers reconnect SSE automatically on network drop; plain chunked transfer does not.

Here's the SSE-formatted version of the OpenAI generator:

import json

async def openai_stream_sse(prompt: str):
try:
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
data = json.dumps({"text": delta})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'message': str(e)})}\n\n"

@app.post("/stream/openai/sse")
async def stream_openai_sse(body: dict):
return StreamingResponse(
openai_stream_sse(body["prompt"]),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"}, # critical for Nginx — see Deployment section
)

The [DONE] sentinel mirrors OpenAI's own wire format and gives the client an explicit termination signal. Parse it on the frontend with:

if (line === "data: [DONE]") break;
const { text } = JSON.parse(line.slice(6)); // strip "data: "

Next.js: Consuming the Stream

Pattern 1 — React Server Action (App Router)

Create app/actions/stream.ts:

"use server";

export async function* streamPrompt(
prompt: string,
provider: "openai" | "anthropic"
) {
const res = await fetch(`http://localhost:8000/stream/${provider}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
});

if (!res.ok) {
throw new Error(`Backend returned ${res.status}: ${await res.text()}`);
}

if (!res.body) throw new Error("No response body");

const reader = res.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;
yield decoder.decode(value, { stream: true });
}
}

Then in your component:

"use client";

import { useState } from "react";
import { streamPrompt } from "../actions/stream";

export default function ChatBox() {
const [output, setOutput] = useState("");
const [loading, setLoading] = useState(false);

async function handleSubmit(prompt: string) {
setOutput("");
setLoading(true);
try {
for await (const chunk of streamPrompt(prompt, "anthropic")) {
setOutput((prev) => prev + chunk);
}
} catch (err) {
setOutput((prev) => prev + "\n\n[Stream error — please retry]");
console.error(err);
} finally {
setLoading(false);
}
}

return (
<div>
<button
onClick={() => handleSubmit("Explain async generators in Python")}
disabled={loading}
>
{loading ? "Generating…" : "Ask"}
</button>
<pre style={{ whiteSpace: "pre-wrap" }}>{output}</pre>
</div>
);
}

The for await...of loop over an async generator is the cleanest pattern here. Each chunk appends to state, React re-renders incrementally, and users see tokens arriving in real time.

Pattern 2 — Next.js Route Handler

If you prefer a Route Handler (closer to an API proxy), create app/api/stream/route.ts:

import { NextRequest } from "next/server";

export const runtime = "edge"; // removes Vercel's function timeout for streaming

export async function POST(req: NextRequest) {
const { prompt, provider } = await req.json();

const upstream = await fetch(
`http://your-fastapi-host/stream/${provider}`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
}
);

if (!upstream.ok) {
return new Response("Upstream error", { status: upstream.status });
}

// Proxy the body directly — no buffering, no parsing
return new Response(upstream.body, {
headers: {
"Content-Type": "text/plain; charset=utf-8",
"Transfer-Encoding": "chunked",
"X-Accel-Buffering": "no",
},
});
}

This pattern is useful when you want the Next.js layer to handle authentication (e.g. check a session cookie) before proxying to FastAPI, without duplicating the streaming logic.

Error Handling Mid-Stream

Once FastAPI sends HTTP 200 and begins streaming, you can no longer change the status code. This means errors that occur mid-generation need to be communicated in-band. Two patterns:

1. Sentinel token in plain text — emit a special string like __ERROR__: rate limit exceeded and detect it on the client. Simple, but fragile if the LLM output could ever contain that string.

2. SSE with a named error event — the cleanest approach (shown above). The client checks event.type before treating data as content:

const eventSource = new EventSource("/api/stream"); // GET-based; adapt for POST
eventSource.addEventListener("error", (e) => {
const { message } = JSON.parse((e as MessageEvent).data);
setOutput((prev) => prev + `\n\n[Error: ${message}]`);
eventSource.close();
});

3. Wrap the generator in try/except on the FastAPI side — always do this regardless of format. An unhandled exception in a generator will silently terminate the stream from the server side; the client sees an abrupt disconnect, not a meaningful error message.

async def openai_stream(prompt: str):
try:
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
except Exception as e:
# In plain-text mode, yield an error indicator
yield f"\n\n[Generation error: {e}]"

Where Things Actually Break

Buffering by a reverse proxy

Nginx buffers responses by default. Add X-Accel-Buffering: no as a response header in FastAPI, or set proxy_buffering off in your Nginx config block. This silently kills streaming in production whilst everything works locally. The header approach is preferable because it lives in your application code rather than infrastructure config:

return StreamingResponse(
openai_stream(body["prompt"]),
media_type="text/plain",
headers={"X-Accel-Buffering": "no"},
)

For Nginx config, the equivalent directive is:

location /stream/ {
proxy_pass http://fastapi_upstream;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 120s; # extend for long generations
}

Vercel's function timeout

Streams deployed to Vercel Serverless Functions hit the default 10-second execution timeout (or 60s on Pro). Either:

  • Keep prompts bounded so generation completes in time, or
  • Switch to Edge Runtime (export const runtime = "edge") which has a 25-second (Hobby) or unlimited (Pro) streaming duration

Edge Runtime doesn't support Node.js APIs, so make sure your route handler is pure Web APIs only.

Missing stream=True on the SDK call

Both SDKs default to non-streaming. Forgetting this means you block until the model finishes, then emit the full response at once. FastAPI still returns it as a StreamingResponse, but the latency is identical to non-streaming — the entire response is buffered in the generator before the first yield.

Decoder state across chunks

Always pass { stream: true } to TextDecoder.decode(). Without it, multibyte UTF-8 characters that straddle chunk boundaries produce garbled output — common with CJK characters, emoji, or any non-ASCII content in the response.

// Correct — maintains decoder state between chunks
const decoder = new TextDecoder();
yield decoder.decode(value, { stream: true });

// Wrong — resets state on every chunk, corrupts multibyte sequences
yield new TextDecoder().decode(value); // creates a fresh decoder each time

Connection drops during long generations

For responses that may take 30+ seconds (e.g. max_tokens=4096), add an AbortController so you can cancel the stream when the user navigates away:

"use client";
import { useEffect, useRef, useState } from "react";

export default function ChatBox() {
const [output, setOutput] = useState("");
const abortRef = useRef<AbortController | null>(null);

useEffect(() => {
return () => abortRef.current?.abort(); // cancel on unmount
}, []);

async function handleSubmit(prompt: string) {
abortRef.current?.abort();
abortRef.current = new AbortController();
setOutput("");

const res = await fetch("/api/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
signal: abortRef.current.signal,
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
setOutput((prev) => prev + decoder.decode(value, { stream: true }));
}
} catch (err: unknown) {
if (err instanceof Error && err.name !== "AbortError") {
console.error("Stream error:", err);
}
}
}
// ... render
}

SDK-Level Differences at a Glance

OpenAI (openai ≥ 1.0)Anthropic (anthropic ≥ 0.25)
Async clientAsyncOpenAI()AsyncAnthropic()
Stream callawait client.chat.completions.create(..., stream=True)async with client.messages.stream(...) as stream:
Iterate tokensasync for chunk in stream:chunk.choices[0].delta.contentasync for text in stream.text_stream:
Null guard neededYes — delta is None on final chunkNo — .text_stream yields strings only
max_tokens requiredNoYes
Stop reasonchunk.choices[0].finish_reasonstream.get_final_message().stop_reason
Usage datachunk.usage (only on final chunk with stream_options)await stream.get_final_message().usage

To access token usage from OpenAI during streaming, pass stream_options={"include_usage": True} to create() — usage appears on the final chunk.

Choosing Between OpenAI and Anthropic

From an integration standpoint, they're nearly identical. Anthropic's .text_stream is marginally more ergonomic because you skip the manual delta unwrap and null guard. The underlying wire protocol is SSE in both cases; the Python SDKs abstract that away entirely.

If the model choice matters for your product, make the endpoint provider-agnostic (as shown above) and swap at the call site — you're not locked in. A simple provider query param or request body field is all you need.

Full Working Example: main.py

import os
import json
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
from pydantic import BaseModel

app = FastAPI()

app.add_middleware(
CORSMiddleware,
allow_origins=[os.getenv("FRONTEND_URL", "http://localhost:3000")],
allow_methods=["POST"],
allow_headers=["Content-Type"],
)

openai_client = AsyncOpenAI()
anthropic_client = AsyncAnthropic()


class StreamRequest(BaseModel):
prompt: str
max_tokens: int = 1024


async def openai_generator(prompt: str):
try:
stream = await openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
except Exception as e:
yield f"\n\n[Error: {e}]"


async def anthropic_generator(prompt: str, max_tokens: int):
try:
async with anthropic_client.messages.stream(
model="claude-opus-4-5",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text
except Exception as e:
yield f"\n\n[Error: {e}]"


@app.post("/stream/openai")
async def stream_openai(body: StreamRequest):
return StreamingResponse(
openai_generator(body.prompt),
media_type="text/plain",
headers={"X-Accel-Buffering": "no"},
)


@app.post("/stream/anthropic")
async def stream_anthropic(body: StreamRequest):
return StreamingResponse(
anthropic_generator(body.prompt, body.max_tokens),
media_type="text/plain",
headers={"X-Accel-Buffering": "no"},
)

Run with:

uvicorn main:app --reload

Wrapping Up

The full pattern is roughly 50 lines of Python and 40 lines of TypeScript. Once in place, the user experience improvement is significant and the complexity overhead is low. The key is recognising that streaming is fundamentally a generator-to-readable-stream pipeline — everything else is SDK syntax differences and deployment config.

The spots that silently break in production are:

  1. Nginx buffering — add X-Accel-Buffering: no
  2. Vercel function timeouts — use Edge Runtime
  3. Missing stream=True — blocks the generator entirely
  4. TextDecoder state — always pass { stream: true }
  5. Missing error handling in the generator — unhandled exceptions disconnect silently

Get the plumbing right once, and it's reusable across every LLM feature you build.

Damian Hodgkiss

Damian Hodgkiss

Senior Staff Engineer at Sumo Group, leading development of AppSumo marketplace. Technical solopreneur with 25+ years of experience building SaaS products.

Creating Freedom

Join me on the journey from engineer to solopreneur. Learn how to build profitable SaaS products while keeping your technical edge.

    Proven strategies

    Learn the counterintuitive ways to find and validate SaaS ideas

    Technical insights

    From choosing tech stacks to building your MVP efficiently

    Founder mindset

    Transform from engineer to entrepreneur with practical steps