Skip to content

Commit 904fa5e

Browse files
lgrammelremorses
andauthored
feat (ai/core): add terminateOnError option to readUIMessageStream (#7503)
## Background Generator use of read ui message stream requires early termination in case of errors. ## Summary Add `onError` and `terminateOnError` options to `readUIMesageStream` ## Related Issues Fixes #7420 Continues #7500 --------- Co-authored-by: remorses <beats.by.morse@gmail.com>
1 parent 75f03b1 commit 904fa5e

File tree

5 files changed

+63
-11
lines changed

5 files changed

+63
-11
lines changed

‎.changeset/nasty-cows-care.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
feat (ai/core): add terminateOnError option to readUIMessageStream

‎content/docs/07-reference/02-ai-sdk-ui/43-read-ui-message-stream.mdx

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,31 @@ import { readUIMessageStream } from 'ai';
2121

2222
<PropertiesTable
2323
content={[
24+
{
25+
name: 'message',
26+
type: 'UIMessage',
27+
isOptional: true,
28+
description:
29+
'The last assistant message to use as a starting point when the conversation is resumed. Otherwise undefined.',
30+
},
2431
{
2532
name: 'stream',
2633
type: 'ReadableStream<UIMessageChunk>',
2734
description: 'The stream of UIMessageChunk objects to read.',
2835
},
2936
{
30-
name: 'message',
31-
type: 'UIMessage',
37+
name: 'onError',
38+
type: '(error: unknown) => void',
3239
isOptional: true,
3340
description:
34-
'The last assistant message to use as a starting point when the conversation is resumed. Otherwise undefined.',
41+
'A function that is called when an error occurs during stream processing.',
42+
},
43+
{
44+
name: 'terminateOnError',
45+
type: 'boolean',
46+
isOptional: true,
47+
description:
48+
'Whether to terminate the stream if an error occurs. Defaults to false.',
3549
},
3650
]}
3751
/>

‎packages/ai/src/ui-message-stream/read-ui-message-stream.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,22 @@ describe('readUIMessageStream', () => {
100100
]
101101
`);
102102
});
103+
104+
it('should throw an error when encountering an error UI stream part', async () => {
105+
const stream = createUIMessageStream([
106+
{ type: 'start', messageId: 'msg-123' },
107+
{ type: 'text-start', id: 'text-1' },
108+
{ type: 'text-delta', id: 'text-1', delta: 'Hello' },
109+
{ type: 'error', errorText: 'Test error message' },
110+
]);
111+
112+
const uiMessages = readUIMessageStream({
113+
stream,
114+
terminateOnError: true,
115+
});
116+
117+
await expect(convertAsyncIterableToArray(uiMessages)).rejects.toThrow(
118+
'Test error message',
119+
);
120+
});
103121
});

‎packages/ai/src/ui-message-stream/read-ui-message-stream.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,25 @@ import { consumeStream } from '../util/consume-stream';
1616
*
1717
* @param options.message - The last assistant message to use as a starting point when the conversation is resumed. Otherwise undefined.
1818
* @param options.stream - The stream of `UIMessageChunk`s to read.
19+
* @param options.terminateOnError - Whether to terminate the stream if an error occurs.
20+
* @param options.onError - A function that is called when an error occurs.
1921
*
2022
* @returns An `AsyncIterableStream` of `UIMessage`s. Each stream part is a different state of the same message
2123
* as it is being completed.
2224
*/
2325
export function readUIMessageStream<UI_MESSAGE extends UIMessage>({
2426
message,
2527
stream,
28+
onError,
29+
terminateOnError = false,
2630
}: {
2731
message?: UI_MESSAGE;
2832
stream: ReadableStream<UIMessageChunk>;
33+
onError?: (error: unknown) => void;
34+
terminateOnError?: boolean;
2935
}): AsyncIterableStream<UI_MESSAGE> {
3036
let controller: ReadableStreamDefaultController<UI_MESSAGE> | undefined;
37+
let hasErrored = false;
3138

3239
const outputStream = new ReadableStream<UI_MESSAGE>({
3340
start(controllerParam) {
@@ -40,6 +47,15 @@ export function readUIMessageStream<UI_MESSAGE extends UIMessage>({
4047
lastMessage: message,
4148
});
4249

50+
const handleError = (error: unknown) => {
51+
onError?.(error);
52+
53+
if (!hasErrored && terminateOnError) {
54+
hasErrored = true;
55+
controller?.error(error);
56+
}
57+
};
58+
4359
consumeStream({
4460
stream: processUIMessageStream({
4561
stream,
@@ -56,12 +72,15 @@ export function readUIMessageStream<UI_MESSAGE extends UIMessage>({
5672
},
5773
});
5874
},
59-
onError: error => {
60-
throw error;
61-
},
75+
onError: handleError,
6276
}),
77+
onError: handleError,
6378
}).finally(() => {
64-
controller?.close();
79+
// Only close if no error occurred. Calling close() on an errored controller
80+
// throws "Invalid state: Controller is already closed" TypeError.
81+
if (!hasErrored) {
82+
controller?.close();
83+
}
6584
});
6685

6786
return createAsyncIterableStream(outputStream);

‎packages/ai/src/ui/process-ui-message-stream.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,3 @@ export function processUIMessageStream<UI_MESSAGE extends UIMessage>({
537537
}),
538538
);
539539
}
540-
541-
function isObject(value: unknown): value is object {
542-
return typeof value === 'object' && value !== null;
543-
}

0 commit comments

Comments
 (0)