Skip to main content

Streaming

Streaming messages is a great way to give a user feedback and keep an application feeling responsive while using LLMs.

Traditionally streaming happens via HTTP streaming, where the client sends a request and waits until the full response is streamed back. This works out of the box when using the Agent, in the same way you would with the AI SDK. See below if that is all you're looking for.

However, with the Agent component you can also stream messages asynchronously, meaning the generation doesn't have to happen in an HTTP handler (httpAction), and the response can be streamed back to one or more clients even if their network connection is interrupted.

It works by saving the streaming parts to the database in groups (deltas), and the clients subscribe to new deltas for the given thread, as they're generated. As a bonus, you don't even need to use the Agent's version of streamText to use the delta streaming approach (see below).

Example:

Streaming message deltas

The easiest way to stream is to pass { saveStreamDeltas: true } to agent.streamText. This will save chunks of the response as deltas as they're generated, so all clients can subscribe to the stream and get live-updating text via normal Convex queries.

agent.streamText(ctx, { threadId }, { prompt }, { saveStreamDeltas: true });

This can be done in an async function, where http streaming to a client is not possible. Under the hood it will chunk up the response and debounce saving the deltas to prevent excessive bandwidth usage. You can pass more options to saveStreamDeltas to configure the chunking and debouncing.

  { saveStreamDeltas: { chunking: "line", throttleMs: 1000 } },
  • chunking can be "word", "line", a regex, or a custom function.
  • throttleMs is how frequently the deltas are saved. This will send multiple chunks per delta, writes sequentially, and will not write faster than the throttleMs (single-flighted ).

Retrieving streamed deltas

For clients to stream messages, you need to expose a query that returns the stream deltas. This is very similar to retrieving messages, with a few changes:

import { paginationOptsValidator } from "convex/server";
import { vStreamArgs, listUIMessages, syncStreams } from "@convex-dev/agent";
import { components } from "./_generated/api";

export const listThreadMessages = query({
args: {
threadId: v.string(),
// Pagination options for the non-streaming messages.
paginationOpts: paginationOptsValidator,
streamArgs: vStreamArgs,
},
handler: async (ctx, args) => {
await authorizeThreadAccess(ctx, threadId);

// Fetches the regular non-streaming messages.
const paginated = await listUIMessages(ctx, components.agent, args);

const streams = await syncStreams(ctx, components.agent, args);

return { ...paginated, streams };
},
});

Similar to with non-streaming messages, you can use the useUIMessages hook to fetch the messages, passing in stream: true to enable streaming.

const { results, status, loadMore } = useUIMessages(
api.chat.streaming.listMessages,
{ threadId },
{ initialNumItems: 10, stream: true },
);

Text smoothing with SmoothText and useSmoothText

The useSmoothText hook is a simple hook that smooths the text as it changes. It can work with any text, but is especially handy for streaming text.

import { useSmoothText } from "@convex-dev/agent/react";

// in the component
const [visibleText] = useSmoothText(message.text);

You can configure the initial characters per second. It will adapt over time to match the average speed of the text coming in.

By default it won't stream the first text it receives unless you pass in startStreaming: true. To start streaming immediately when you have a mix of streaming and non-streaming messages, do:

import { useSmoothText, type UIMessage } from "@convex-dev/agent/react";

function Message({ message }: { message: UIMessage }) {
const [visibleText] = useSmoothText(message.text, {
startStreaming: message.status === "streaming",
});
return <div>{visibleText}</div>;
}

If you don't want to use the hook, you can use the SmoothText component.

import { SmoothText } from "@convex-dev/agent/react";

//...
<SmoothText text={message.text} />;

Consuming the stream yourself with the Agent

You can consume the stream in all the ways you can with the underlying AI SDK - for instance iterating over the content, or using result.toDataStreamResponse().

If you are not also saving the deltas, it might look like this:

const result = await agent.streamText(ctx, { threadId }, { prompt });

for await (const textPart of result.textStream) {
console.log(textPart);
}

If you want to both iterate as the stream is happening, as well as save the deltas, you can pass { saveStreamDeltas: { returnImmediately: true } } to streamText. This will return immediately, and you can then iterate over the stream live, or return the stream in an HTTP Response.

const result = await agent.streamText(
ctx,
{ threadId },
{ prompt },
{ saveStreamDeltas: { returnImmediately: true } },
);

return result.toUIMessageStreamResponse();

If you don't want to have the Agent involved at all, the next section will show you how to save the deltas yourself.

Advanced: Streaming deltas asynchronously without using an Agent

To stream messages without using the Agent's wrapper of streamText, you can use the streamText function from the AI SDK directly.

It consists of using the DeltaStreamer class to save the deltas to the database, and then using the above approach to retrieve the messages, though you can use a more direct useStreamingUIMessages hook that doesn't involve reading any non-streaming messages from the database.

The requirements for reading and writing the streams are just that they use a threadId from the Agent component, and that each stream is saved with a distinct order, for ordering on the client side.

import { components } from "./_generated/api";
import { type ActionCtx } from "./_generated/server";
import { DeltaStreamer, compressUIMessageChunks } from "@convex-dev/agent";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

async function stream(ctx: ActionCtx, threadId: string, order: number) {
const streamer = new DeltaStreamer(
components.agent,
ctx,
{
throttleMs: 100,
onAsyncAbort: async () => console.error("Aborted asynchronously"),
// This will collapse multiple tiny deltas into one if they're being sent
// in quick succession.
compress: compressUIMessageChunks,
abortSignal: undefined,
},
{
threadId,
format: "UIMessageChunk",
order,
stepOrder: 0,
userId: undefined,
},
);
// Do the normal streaming with the AI SDK
const response = streamText({
model: openai.chat("gpt-4o-mini"),
prompt: "Tell me a joke",
abortSignal: streamer.abortController.signal,
onError: (error) => {
console.error(error);
streamer.fail(errorToString(error.error));
},
});

// We could await here if we wanted to wait for the stream to finish,
// but instead we have it process asynchronously so we can return a streaming
// http Response.
void streamer.consumeStream(response.toUIMessageStream());

return {
// e.g. to do `response.toTextStreamResponse()` for HTTP streaming.
response,
// We don't need this on the client, but with it we can have some clients
// selectively not stream down deltas when they're using HTTP streaming
// already.
streamId: await streamer.getStreamId(),
};
}

To fetch the deltas for the client, you can use the syncStreams function, as you would with normal Agent streaming. If you don't want to fetch the non-streaming messages, it can be simplified to:

import { v } from "convex/values";
import { vStreamArgs, syncStreams } from "@convex-dev/agent";
import { query } from "./_generated/server";
import { components } from "./_generated/api";

export const listStreams = query({
args: {
threadId: v.string(),
streamArgs: vStreamArgs,
},
handler: async (ctx, args) => {
// await authorizeThreadAccess(ctx, args.threadId);
const streams = await syncStreams(ctx, components.agent, {
...args,
// By default syncStreams only returns streaming messages. However, if
// your messages aren't saved in the same transaction as the streaming
// ends, you might want to include them here to avoid UI flashes.
includeStatuses: ["streaming", "aborted", "finished"],
});
return { streams };
},
});

On the client side, you can use the useStreamingUIMessages hook to fetch the messages. If you defined more arguments than just threadId, they'll get passed along with threadId here.

const messages = useStreamingUIMessages(api.example.listStreams, { threadId });

You can pass in another parameter to either skip certain streamIds or to start at some order to ignore previous streams.