Skip to content

Commit dcc549b

Browse files
authored
chore (ai): data stream updates (#6158)
## Background `DataStream` and `streamText` are fairly tightly coupled because of their evolution. Going forward, data streams (or something similar) should be available more broadly to allow for flexible usage. ## Summary * remove `StreamTextResult.mergeIntoDataStream` method * rename `DataStreamOptions.getErrorMessage` to `onError` * add `pipeTextStreamToResponse` function * add `createTextStreamResponse` function * change `createDataStreamResponse` function to accept a `DataStream` and not a `DataStreamWriter` * change `pipeDataStreamToResponse` function to accept a `DataStream` and not a `DataStreamWriter` * change `pipeDataStreamToResponse` function to have a single parameter
1 parent 3a961cb commit dcc549b

File tree

31 files changed

+331
-281
lines changed

31 files changed

+331
-281
lines changed

‎.changeset/tender-comics-rescue.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'ai': major
3+
---
4+
5+
remove StreamTextResult.mergeIntoDataStream method
6+
rename DataStreamOptions.getErrorMessage to onError
7+
add pipeTextStreamToResponse function
8+
add createTextStreamResponse function
9+
change createDataStreamResponse function to accept a DataStream and not a DataStreamWriter
10+
change pipeDataStreamToResponse function to accept a DataStream and not a DataStreamWriter
11+
change pipeDataStreamToResponse function to have a single parameter

‎examples/express/src/server.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { openai } from '@ai-sdk/openai';
2-
import { pipeDataStreamToResponse, streamText } from 'ai';
2+
import { createDataStream, pipeDataStreamToResponse, streamText } from 'ai';
33
import 'dotenv/config';
44
import express, { Request, Response } from 'express';
55

@@ -16,23 +16,26 @@ app.post('/', async (req: Request, res: Response) => {
1616

1717
app.post('/stream-data', async (req: Request, res: Response) => {
1818
// immediately start streaming the response
19-
pipeDataStreamToResponse(res, {
20-
execute: async dataStreamWriter => {
21-
dataStreamWriter.writeData('initialized call');
19+
20+
const dataStream = createDataStream({
21+
execute: async writer => {
22+
writer.writeData('initialized call');
2223

2324
const result = streamText({
2425
model: openai('gpt-4o'),
2526
prompt: 'Invent a new holiday and describe its traditions.',
2627
});
2728

28-
result.mergeIntoDataStream(dataStreamWriter);
29+
writer.merge(result.toDataStream());
2930
},
3031
onError: error => {
3132
// Error messages are masked by default for security reasons.
3233
// If you want to expose the error message to the client, you can do so here:
3334
return error instanceof Error ? error.message : String(error);
3435
},
3536
});
37+
38+
pipeDataStreamToResponse({ response: res, dataStream });
3639
});
3740

3841
app.listen(8080, () => {

‎examples/fastify/src/server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ fastify.post('/', async function (request, reply) {
2121
fastify.post('/stream-data', async function (request, reply) {
2222
// immediately start streaming the response
2323
const dataStream = createDataStream({
24-
execute: async dataStreamWriter => {
25-
dataStreamWriter.writeData('initialized call');
24+
execute: async writer => {
25+
writer.writeData('initialized call');
2626

2727
const result = streamText({
2828
model: openai('gpt-4o'),
2929
prompt: 'Invent a new holiday and describe its traditions.',
3030
});
3131

32-
result.mergeIntoDataStream(dataStreamWriter);
32+
writer.merge(result.toDataStream());
3333
},
3434
onError: error => {
3535
// Error messages are masked by default for security reasons.

‎examples/hono/src/server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ app.post('/', async c => {
2323
app.post('/stream-data', async c => {
2424
// immediately start streaming the response
2525
const dataStream = createDataStream({
26-
execute: async dataStreamWriter => {
27-
dataStreamWriter.writeData('initialized call');
26+
execute: async writer => {
27+
writer.writeData('initialized call');
2828

2929
const result = streamText({
3030
model: openai('gpt-4o'),
3131
prompt: 'Invent a new holiday and describe its traditions.',
3232
});
3333

34-
result.mergeIntoDataStream(dataStreamWriter);
34+
writer.merge(result.toDataStream());
3535
},
3636
onError: error => {
3737
// Error messages are masked by default for security reasons.

‎examples/nest/src/app.controller.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { openai } from '@ai-sdk/openai';
22
import { Controller, Post, Res } from '@nestjs/common';
3-
import { pipeDataStreamToResponse, streamText } from 'ai';
3+
import { createDataStream, pipeDataStreamToResponse, streamText } from 'ai';
44
import { Response } from 'express';
55

66
@Controller()
@@ -17,22 +17,24 @@ export class AppController {
1717

1818
@Post('/stream-data')
1919
async streamData(@Res() res: Response) {
20-
pipeDataStreamToResponse(res, {
21-
execute: async (dataStreamWriter) => {
22-
dataStreamWriter.writeData('initialized call');
20+
const dataStream = createDataStream({
21+
execute: async (writer) => {
22+
writer.writeData('initialized call');
2323

2424
const result = streamText({
2525
model: openai('gpt-4o'),
2626
prompt: 'Invent a new holiday and describe its traditions.',
2727
});
2828

29-
result.mergeIntoDataStream(dataStreamWriter);
29+
writer.merge(result.toDataStream());
3030
},
3131
onError: (error) => {
3232
// Error messages are masked by default for security reasons.
3333
// If you want to expose the error message to the client, you can do so here:
3434
return error instanceof Error ? error.message : String(error);
3535
},
3636
});
37+
38+
pipeDataStreamToResponse({ response: res, dataStream });
3739
}
3840
}
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { openai } from '@ai-sdk/openai';
2-
import { createDataStreamResponse, streamText } from 'ai';
2+
import { createDataStream, createDataStreamResponse, streamText } from 'ai';
33

44
export async function POST(req: Request) {
55
const { messages } = await req.json();
66

7-
return createDataStreamResponse({
8-
execute: dataStream => {
7+
const dataStream = createDataStream({
8+
execute: writer => {
99
// write a custom url source to the stream:
10-
dataStream.writeSource({
10+
writer.writeSource({
1111
type: 'source',
1212
sourceType: 'url',
1313
id: 'source-1',
@@ -20,7 +20,9 @@ export async function POST(req: Request) {
2020
messages,
2121
});
2222

23-
result.mergeIntoDataStream(dataStream);
23+
writer.merge(result.toDataStream());
2424
},
2525
});
26+
27+
return createDataStreamResponse({ dataStream });
2628
}

‎examples/next-openai/app/api/use-chat-human-in-the-loop/route.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { openai } from '@ai-sdk/openai';
2-
import { createDataStreamResponse, UIMessage, streamText } from 'ai';
2+
import {
3+
createDataStreamResponse,
4+
UIMessage,
5+
streamText,
6+
createDataStream,
7+
} from 'ai';
38
import { processToolCalls } from './utils';
49
import { tools } from './tools';
510

@@ -9,14 +14,14 @@ export const maxDuration = 30;
914
export async function POST(req: Request) {
1015
const { messages }: { messages: UIMessage[] } = await req.json();
1116

12-
return createDataStreamResponse({
13-
execute: async dataStream => {
17+
const dataStream = createDataStream({
18+
execute: async writer => {
1419
// Utility function to handle tools that require human confirmation
1520
// Checks for confirmation in last message and then runs associated tool
1621
const processedMessages = await processToolCalls(
1722
{
1823
messages,
19-
dataStream,
24+
dataStream: writer,
2025
tools,
2126
},
2227
{
@@ -36,7 +41,9 @@ export async function POST(req: Request) {
3641
tools,
3742
});
3843

39-
result.mergeIntoDataStream(dataStream);
44+
writer.merge(result.toDataStream());
4045
},
4146
});
47+
48+
return createDataStreamResponse({ dataStream });
4249
}

‎examples/next-openai/app/api/use-chat-persistence-single-message-image-output/route.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { loadChat, saveChat } from '@util/chat-store';
33
import {
44
appendClientMessage,
55
appendResponseMessages,
6+
createDataStream,
67
createDataStreamResponse,
78
createIdGenerator,
89
streamText,
@@ -25,8 +26,8 @@ export async function POST(req: Request) {
2526
});
2627

2728
// immediately start streaming (solves RAG issues with status, etc.)
28-
return createDataStreamResponse({
29-
execute: dataStream => {
29+
const dataStream = createDataStream({
30+
execute: writer => {
3031
const result = streamText({
3132
model: google('gemini-2.0-flash-exp'),
3233
providerOptions: {
@@ -49,12 +50,14 @@ export async function POST(req: Request) {
4950
},
5051
});
5152

52-
result.mergeIntoDataStream(dataStream);
53+
writer.merge(result.toDataStream());
5354
},
5455
onError: error => {
5556
// Error messages are masked by default for security reasons.
5657
// If you want to expose the error message to the client, you can do so here:
5758
return error instanceof Error ? error.message : String(error);
5859
},
5960
});
61+
62+
return createDataStreamResponse({ dataStream });
6063
}

‎examples/next-openai/app/api/use-chat-persistence-single-message-tools/route.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { loadChat, saveChat } from '@util/chat-store';
33
import {
44
appendClientMessage,
55
appendResponseMessages,
6+
createDataStream,
67
createDataStreamResponse,
78
createIdGenerator,
89
streamText,
@@ -29,9 +30,9 @@ export async function POST(req: Request) {
2930
});
3031

3132
// immediately start streaming (solves RAG issues with status, etc.)
32-
return createDataStreamResponse({
33-
execute: dataStream => {
34-
dataStream.writeMessageAnnotation({
33+
const dataStream = createDataStream({
34+
execute: writer => {
35+
writer.writeMessageAnnotation({
3536
start: 'start',
3637
count: count++,
3738
});
@@ -63,7 +64,7 @@ export async function POST(req: Request) {
6364
Math.floor(Math.random() * weatherOptions.length)
6465
];
6566

66-
dataStream.writeMessageAnnotation({
67+
writer.writeMessageAnnotation({
6768
city,
6869
weather,
6970
});
@@ -103,12 +104,14 @@ export async function POST(req: Request) {
103104
},
104105
});
105106

106-
result.mergeIntoDataStream(dataStream);
107+
writer.merge(result.toDataStream());
107108
},
108109
onError: error => {
109110
// Error messages are masked by default for security reasons.
110111
// If you want to expose the error message to the client, you can do so here:
111112
return error instanceof Error ? error.message : String(error);
112113
},
113114
});
115+
116+
return createDataStreamResponse({ dataStream });
114117
}

‎examples/next-openai/app/api/use-chat-resume/route.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export async function POST(req: Request) {
4141
await appendStreamId({ chatId: id, streamId });
4242

4343
const stream = createDataStream({
44-
execute: dataStream => {
44+
execute: writer => {
4545
const result = streamText({
4646
model: openai('gpt-4o'),
4747
messages,
@@ -56,7 +56,7 @@ export async function POST(req: Request) {
5656
},
5757
});
5858

59-
result.mergeIntoDataStream(dataStream);
59+
writer.merge(result.toDataStream());
6060
},
6161
});
6262

0 commit comments

Comments
 (0)