Skip to content

Commit bd398e4

Browse files
iteratetogracenesslgrammel
authored andcommitted
fix (core): consume stream on abort (#5492)
1 parent 69f88e5 commit bd398e4

File tree

8 files changed

+165
-22
lines changed

8 files changed

+165
-22
lines changed

‎.changeset/thin-numbers-shave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
fix (core): improve error handling in streamText's consumeStream method

‎content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,13 +2274,26 @@ To see `streamText` in action, check out [these examples](#examples).
22742274
},
22752275
{
22762276
name: 'consumeStream',
2277-
type: '() => Promise<void>',
2277+
type: '(options?: ConsumeStreamOptions) => Promise<void>',
22782278
description:
2279-
'Consumes the stream without processing the parts. This is useful to force the stream to finish.',
2279+
'Consumes the stream without processing the parts. This is useful to force the stream to finish. If an error occurs, it is passed to the optional `onError` callback.',
2280+
properties: [
2281+
{
2282+
type: 'ConsumeStreamOptions',
2283+
parameters: [
2284+
{
2285+
name: 'onError',
2286+
type: '(error: unknown) => void',
2287+
isOptional: true,
2288+
description: 'The error callback.',
2289+
},
2290+
],
2291+
},
2292+
],
22802293
},
22812294
{
22822295
name: 'pipeDataStreamToResponse',
2283-
type: '(response: ServerResponse, options: PipeDataStreamToResponseOptions } => void',
2296+
type: '(response: ServerResponse, options: PipeDataStreamToResponseOptions) => void',
22842297
description:
22852298
'Writes stream data output to a Node.js response-like object. It sets a `Content-Type` header to `text/plain; charset=utf-8` and writes each stream data part as a separate chunk.',
22862299
properties: [

‎examples/next-openai/app/api/use-chat-resilient-persistence/route.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { openai } from '@ai-sdk/openai';
2-
import { appendResponseMessages, createIdGenerator, streamText } from 'ai';
32
import { saveChat } from '@util/chat-store';
3+
import { appendResponseMessages, createIdGenerator, streamText } from 'ai';
44

55
export async function POST(req: Request) {
66
const { messages, id } = await req.json();
@@ -26,7 +26,12 @@ export async function POST(req: Request) {
2626

2727
// consume the stream to ensure it runs to completion and triggers onFinish
2828
// even when the client response is aborted (e.g. when the browser tab is closed).
29-
result.consumeStream(); // no await
29+
// no await
30+
result.consumeStream({
31+
onError: error => {
32+
console.log('Error during background stream consumption: ', error); // optional error callback
33+
},
34+
});
3035

3136
return result.toDataStreamResponse();
3237
}

‎examples/next-openai/app/use-chat-resilient-persistence/[id]/chat.tsx

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
'use client';
22

3-
import { createIdGenerator } from 'ai';
43
import { Message, useChat } from '@ai-sdk/react';
4+
import { createIdGenerator } from 'ai';
55

66
export default function Chat({
77
id,
88
initialMessages,
99
}: { id?: string | undefined; initialMessages?: Message[] } = {}) {
10-
const { input, status, handleInputChange, handleSubmit, messages } = useChat({
11-
api: '/api/use-chat-resilient-persistence',
12-
id, // use the provided chatId
13-
initialMessages, // initial messages if provided
14-
sendExtraMessageFields: true, // send id and createdAt for each message
15-
generateId: createIdGenerator({ prefix: 'msgc', size: 16 }), // id format for client-side messages
16-
});
10+
const { input, status, handleInputChange, handleSubmit, messages, stop } =
11+
useChat({
12+
api: '/api/use-chat-resilient-persistence',
13+
id, // use the provided chatId
14+
initialMessages, // initial messages if provided
15+
sendExtraMessageFields: true, // send id and createdAt for each message
16+
generateId: createIdGenerator({ prefix: 'msgc', size: 16 }), // id format for client-side messages
17+
});
1718

1819
return (
1920
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
@@ -32,6 +33,15 @@ export default function Chat({
3233
onChange={handleInputChange}
3334
disabled={status !== 'ready'}
3435
/>
36+
{status === 'streaming' && (
37+
<button
38+
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
39+
type="submit"
40+
onClick={stop}
41+
>
42+
Stop
43+
</button>
44+
)}
3545
</form>
3646
</div>
3747
);

‎packages/ai/core/generate-text/stream-text-result.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ export type DataStreamOptions = {
6161
experimental_sendStart?: boolean;
6262
};
6363

64+
export type ConsumeStreamOptions = {
65+
onError?: (error: unknown) => void;
66+
};
67+
6468
/**
6569
A result object for accessing different stream types and additional information.
6670
*/
@@ -203,8 +207,10 @@ Consumes the stream without processing the parts.
203207
This is useful to force the stream to finish.
204208
It effectively removes the backpressure and allows the stream to finish,
205209
triggering the `onFinish` callback and the promise resolution.
210+
211+
If an error occurs, it is passed to the optional `onError` callback.
206212
*/
207-
consumeStream(): Promise<void>;
213+
consumeStream(options?: ConsumeStreamOptions): Promise<void>;
208214

209215
/**
210216
Converts the result to a data stream.

‎packages/ai/core/generate-text/stream-text.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,92 @@ describe('streamText', () => {
15421542
});
15431543
});
15441544

1545+
describe('result.consumeStream', () => {
1546+
it('should ignore AbortError during stream consumption', async () => {
1547+
const result = streamText({
1548+
model: createTestModel({
1549+
stream: new ReadableStream({
1550+
start(controller) {
1551+
controller.enqueue({ type: 'text-delta', textDelta: 'Hello' });
1552+
queueMicrotask(() => {
1553+
controller.error(
1554+
Object.assign(new Error('Stream aborted'), {
1555+
name: 'AbortError',
1556+
}),
1557+
);
1558+
});
1559+
},
1560+
}),
1561+
}),
1562+
prompt: 'test-input',
1563+
});
1564+
1565+
await expect(result.consumeStream()).resolves.not.toThrow();
1566+
});
1567+
1568+
it('should ignore ResponseAborted error during stream consumption', async () => {
1569+
const result = streamText({
1570+
model: createTestModel({
1571+
stream: new ReadableStream({
1572+
start(controller) {
1573+
controller.enqueue({ type: 'text-delta', textDelta: 'Hello' });
1574+
queueMicrotask(() => {
1575+
controller.error(
1576+
Object.assign(new Error('Response aborted'), {
1577+
name: 'ResponseAborted',
1578+
}),
1579+
);
1580+
});
1581+
},
1582+
}),
1583+
}),
1584+
prompt: 'test-input',
1585+
});
1586+
1587+
await expect(result.consumeStream()).resolves.not.toThrow();
1588+
});
1589+
1590+
it('should ignore any errors during stream consumption', async () => {
1591+
const result = streamText({
1592+
model: createTestModel({
1593+
stream: new ReadableStream({
1594+
start(controller) {
1595+
controller.enqueue({ type: 'text-delta', textDelta: 'Hello' });
1596+
queueMicrotask(() => {
1597+
controller.error(Object.assign(new Error('Some error')));
1598+
});
1599+
},
1600+
}),
1601+
}),
1602+
prompt: 'test-input',
1603+
});
1604+
1605+
await expect(result.consumeStream()).resolves.not.toThrow();
1606+
});
1607+
1608+
it('should call the onError callback with the error', async () => {
1609+
const onErrorCallback = vi.fn();
1610+
const result = streamText({
1611+
model: createTestModel({
1612+
stream: new ReadableStream({
1613+
start(controller) {
1614+
controller.enqueue({ type: 'text-delta', textDelta: 'Hello' });
1615+
queueMicrotask(() => {
1616+
controller.error(Object.assign(new Error('Some error')));
1617+
});
1618+
},
1619+
}),
1620+
}),
1621+
prompt: 'test-input',
1622+
});
1623+
1624+
await expect(
1625+
result.consumeStream({ onError: onErrorCallback }),
1626+
).resolves.not.toThrow();
1627+
expect(onErrorCallback).toHaveBeenCalledWith(new Error('Some error'));
1628+
});
1629+
});
1630+
15451631
describe('multiple stream consumption', () => {
15461632
it('should support text stream, ai stream, full stream on single result object', async () => {
15471633
const result = streamText({

‎packages/ai/core/generate-text/stream-text.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { InvalidStreamPartError } from '../../errors/invalid-stream-part-error';
88
import { NoOutputSpecifiedError } from '../../errors/no-output-specified-error';
99
import { StreamData } from '../../streams/stream-data';
1010
import { asArray } from '../../util/as-array';
11+
import { consumeStream } from '../../util/consume-stream';
1112
import { DelayedPromise } from '../../util/delayed-promise';
1213
import { DataStreamWriter } from '../data-stream/data-stream-writer';
1314
import { CallSettings } from '../prompt/call-settings';
@@ -53,6 +54,7 @@ import {
5354
} from './run-tools-transformation';
5455
import { ResponseMessage, StepResult } from './step-result';
5556
import {
57+
ConsumeStreamOptions,
5658
DataStreamOptions,
5759
StreamTextResult,
5860
TextStreamPart,
@@ -1591,10 +1593,14 @@ However, the LLM results are expected to be small enough to not cause issues.
15911593
);
15921594
}
15931595

1594-
async consumeStream(): Promise<void> {
1595-
const stream = this.fullStream;
1596-
for await (const part of stream) {
1597-
// no op
1596+
async consumeStream(options?: ConsumeStreamOptions): Promise<void> {
1597+
try {
1598+
await consumeStream({
1599+
stream: this.fullStream,
1600+
onError: options?.onError,
1601+
});
1602+
} catch (error) {
1603+
options?.onError?.(error);
15981604
}
15991605
}
16001606

‎packages/ai/util/consume-stream.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,22 @@
88
* @param {ReadableStream} stream - The ReadableStream to be consumed.
99
* @returns {Promise<void>} A promise that resolves when the stream is fully consumed.
1010
*/
11-
export async function consumeStream(stream: ReadableStream): Promise<void> {
11+
export async function consumeStream({
12+
stream,
13+
onError,
14+
}: {
15+
stream: ReadableStream;
16+
onError?: (error: unknown) => void;
17+
}): Promise<void> {
1218
const reader = stream.getReader();
13-
while (true) {
14-
const { done } = await reader.read();
15-
if (done) break;
19+
try {
20+
while (true) {
21+
const { done } = await reader.read();
22+
if (done) break;
23+
}
24+
} catch (error) {
25+
onError?.(error);
26+
} finally {
27+
reader.releaseLock();
1628
}
1729
}

0 commit comments

Comments
 (0)