Skip to content

Commit 650dfaf

Browse files
core[patch]: Move async generator consumption code into local storage context (#5439)
* Move async generator consumption code into local storage context * Typing fix, adds test, remove unnecessary void * lint n stuff --------- Co-authored-by: bracesproul <braceasproul@gmail.com>
1 parent 1ea411e commit 650dfaf

File tree

5 files changed

+246
-27
lines changed

5 files changed

+246
-27
lines changed

‎langchain-core/src/runnables/base.ts‎

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,11 @@ export abstract class Runnable<
298298
): Promise<IterableReadableStream<RunOutput>> {
299299
// Buffer the first streamed chunk to allow for initial errors
300300
// to surface immediately.
301-
const wrappedGenerator = new AsyncGeneratorWithSetup(
302-
this._streamIterator(input, ensureConfig(options))
303-
);
301+
const config = ensureConfig(options);
302+
const wrappedGenerator = new AsyncGeneratorWithSetup({
303+
generator: this._streamIterator(input, config),
304+
config,
305+
});
304306
await wrappedGenerator.setup;
305307
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
306308
}
@@ -1945,9 +1947,11 @@ export class RunnableMap<
19451947
async function* generator() {
19461948
yield input;
19471949
}
1948-
const wrappedGenerator = new AsyncGeneratorWithSetup(
1949-
this.transform(generator(), options)
1950-
);
1950+
const config = ensureConfig(options);
1951+
const wrappedGenerator = new AsyncGeneratorWithSetup({
1952+
generator: this.transform(generator(), config),
1953+
config,
1954+
});
19511955
await wrappedGenerator.setup;
19521956
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
19531957
}
@@ -2151,9 +2155,11 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
21512155
async function* generator() {
21522156
yield input;
21532157
}
2154-
const wrappedGenerator = new AsyncGeneratorWithSetup(
2155-
this.transform(generator(), options)
2156-
);
2158+
const config = ensureConfig(options);
2159+
const wrappedGenerator = new AsyncGeneratorWithSetup({
2160+
generator: this.transform(generator(), config),
2161+
config,
2162+
});
21572163
await wrappedGenerator.setup;
21582164
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
21592165
}
@@ -2458,9 +2464,11 @@ export class RunnableAssign<
24582464
async function* generator() {
24592465
yield input;
24602466
}
2461-
const wrappedGenerator = new AsyncGeneratorWithSetup(
2462-
this.transform(generator(), options)
2463-
);
2467+
const config = ensureConfig(options);
2468+
const wrappedGenerator = new AsyncGeneratorWithSetup({
2469+
generator: this.transform(generator(), config),
2470+
config,
2471+
});
24642472
await wrappedGenerator.setup;
24652473
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
24662474
}
@@ -2549,9 +2557,11 @@ export class RunnablePick<
25492557
async function* generator() {
25502558
yield input;
25512559
}
2552-
const wrappedGenerator = new AsyncGeneratorWithSetup(
2553-
this.transform(generator(), options)
2554-
);
2560+
const config = ensureConfig(options);
2561+
const wrappedGenerator = new AsyncGeneratorWithSetup({
2562+
generator: this.transform(generator(), config),
2563+
config,
2564+
});
25552565
await wrappedGenerator.setup;
25562566
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
25572567
}

‎langchain-core/src/runnables/passthrough.ts‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ type RunnablePassthroughFunc<RunInput = any> =
4040
* );
4141
* ```
4242
*/
43-
export class RunnablePassthrough<RunInput> extends Runnable<
43+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
44+
export class RunnablePassthrough<RunInput = any> extends Runnable<
4445
RunInput,
4546
RunInput
4647
> {

‎langchain-core/src/runnables/tests/runnable_stream_events.test.ts‎

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44

55
import { test } from "@jest/globals";
66
import { z } from "zod";
7-
import { RunnableLambda } from "../index.js";
7+
import {
8+
RunnableLambda,
9+
RunnableMap,
10+
RunnablePassthrough,
11+
RunnablePick,
12+
} from "../index.js";
813
import { ChatPromptTemplate } from "../../prompts/chat.js";
914
import {
1015
FakeListChatModel,
@@ -381,6 +386,166 @@ test("Runnable streamEvents method with three runnables with filtering", async (
381386
]);
382387
});
383388

389+
test("Runnable streamEvents method with a runnable map", async () => {
390+
const r = RunnableLambda.from(reverse);
391+
392+
const chain = RunnableMap.from({
393+
reversed: r,
394+
original: new RunnablePassthrough(),
395+
}).pipe(new RunnablePick("reversed"));
396+
397+
const events = [];
398+
const eventStream = await chain.streamEvents("hello", { version: "v1" });
399+
for await (const event of eventStream) {
400+
events.push(event);
401+
}
402+
console.log(events);
403+
expect(events).toEqual([
404+
{
405+
run_id: expect.any(String),
406+
event: "on_chain_start",
407+
name: "RunnableSequence",
408+
tags: [],
409+
metadata: {},
410+
data: { input: "hello" },
411+
},
412+
{
413+
event: "on_chain_start",
414+
name: "RunnableMap",
415+
run_id: expect.any(String),
416+
tags: ["seq:step:1"],
417+
metadata: {},
418+
data: {},
419+
},
420+
{
421+
event: "on_chain_start",
422+
name: "RunnableLambda",
423+
run_id: expect.any(String),
424+
tags: ["map:key:reversed"],
425+
metadata: {},
426+
data: {},
427+
},
428+
{
429+
event: "on_chain_start",
430+
name: "RunnablePassthrough",
431+
run_id: expect.any(String),
432+
tags: ["map:key:original"],
433+
metadata: {},
434+
data: {},
435+
},
436+
{
437+
event: "on_chain_stream",
438+
name: "RunnablePassthrough",
439+
run_id: expect.any(String),
440+
tags: ["map:key:original"],
441+
metadata: {},
442+
data: { chunk: "hello" },
443+
},
444+
{
445+
event: "on_chain_stream",
446+
name: "RunnableLambda",
447+
run_id: expect.any(String),
448+
tags: ["map:key:reversed"],
449+
metadata: {},
450+
data: { chunk: "olleh" },
451+
},
452+
{
453+
event: "on_chain_stream",
454+
name: "RunnableMap",
455+
run_id: expect.any(String),
456+
tags: ["seq:step:1"],
457+
metadata: {},
458+
data: {
459+
chunk: {
460+
original: "hello",
461+
},
462+
},
463+
},
464+
{
465+
event: "on_chain_start",
466+
name: "RunnablePick",
467+
run_id: expect.any(String),
468+
tags: ["seq:step:2"],
469+
metadata: {},
470+
data: {},
471+
},
472+
{
473+
event: "on_chain_stream",
474+
name: "RunnableMap",
475+
run_id: expect.any(String),
476+
tags: ["seq:step:1"],
477+
metadata: {},
478+
data: {
479+
chunk: {
480+
reversed: "olleh",
481+
},
482+
},
483+
},
484+
{
485+
event: "on_chain_end",
486+
name: "RunnablePassthrough",
487+
run_id: expect.any(String),
488+
tags: ["map:key:original"],
489+
metadata: {},
490+
data: { input: "hello", output: "hello" },
491+
},
492+
{
493+
event: "on_chain_stream",
494+
name: "RunnablePick",
495+
run_id: expect.any(String),
496+
tags: ["seq:step:2"],
497+
metadata: {},
498+
data: { chunk: "olleh" },
499+
},
500+
{
501+
event: "on_chain_stream",
502+
run_id: expect.any(String),
503+
tags: [],
504+
metadata: {},
505+
name: "RunnableSequence",
506+
data: { chunk: "olleh" },
507+
},
508+
{
509+
event: "on_chain_end",
510+
name: "RunnableLambda",
511+
run_id: expect.any(String),
512+
tags: ["map:key:reversed"],
513+
metadata: {},
514+
data: { input: "hello", output: "olleh" },
515+
},
516+
{
517+
event: "on_chain_end",
518+
name: "RunnableMap",
519+
run_id: expect.any(String),
520+
tags: ["seq:step:1"],
521+
metadata: {},
522+
data: {
523+
input: "hello",
524+
output: {
525+
original: "hello",
526+
reversed: "olleh",
527+
},
528+
},
529+
},
530+
{
531+
event: "on_chain_end",
532+
name: "RunnablePick",
533+
run_id: expect.any(String),
534+
tags: ["seq:step:2"],
535+
metadata: {},
536+
data: { output: "olleh" },
537+
},
538+
{
539+
event: "on_chain_end",
540+
name: "RunnableSequence",
541+
run_id: expect.any(String),
542+
tags: [],
543+
metadata: {},
544+
data: { output: "olleh" },
545+
},
546+
]);
547+
});
548+
384549
test("Runnable streamEvents method with llm", async () => {
385550
const model = new FakeStreamingLLM({
386551
responses: ["hey!"],

‎langchain-core/src/singletons/tests/async_local_storage.test.ts‎

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,31 @@ test("Config should be automatically populated after setting global async local
131131
events.filter((event) => event.event === "on_llm_start").length
132132
).toEqual(1);
133133
});
134+
135+
test("Runnable streamEvents method with streaming nested in a RunnableLambda", async () => {
136+
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
137+
new AsyncLocalStorage()
138+
);
139+
const chat = new FakeListChatModel({
140+
responses: ["Hello"],
141+
});
142+
const myFunc = async (input: string) => {
143+
for await (const _ of await chat.stream(input)) {
144+
// no-op
145+
}
146+
};
147+
148+
const myNestedLambda = RunnableLambda.from(myFunc);
149+
150+
const events = [];
151+
for await (const event of myNestedLambda.streamEvents("hello", {
152+
version: "v1",
153+
})) {
154+
console.log(event);
155+
events.push(event);
156+
}
157+
const chatModelStreamEvent = events.find((event) => {
158+
return event.event === "on_llm_stream";
159+
});
160+
expect(chatModelStreamEvent).toBeDefined();
161+
});

‎langchain-core/src/utils/stream.ts‎

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// Make this a type to override ReadableStream's async iterator type in case
22
// the popular web-streams-polyfill is imported - the supplied types
3+
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
4+
35
// in this case don't quite match.
46
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
57
AsyncIterable<T>;
@@ -182,23 +184,33 @@ export class AsyncGeneratorWithSetup<
182184

183185
public setup: Promise<S>;
184186

187+
public config?: unknown;
188+
185189
private firstResult: Promise<IteratorResult<T>>;
186190

187191
private firstResultUsed = false;
188192

189-
constructor(generator: AsyncGenerator<T>, startSetup?: () => Promise<S>) {
190-
this.generator = generator;
193+
constructor(params: {
194+
generator: AsyncGenerator<T>;
195+
startSetup?: () => Promise<S>;
196+
config?: unknown;
197+
}) {
198+
this.generator = params.generator;
199+
this.config = params.config;
191200
// setup is a promise that resolves only after the first iterator value
192201
// is available. this is useful when setup of several piped generators
193202
// needs to happen in logical order, ie. in the order in which input to
194203
// to each generator is available.
195204
this.setup = new Promise((resolve, reject) => {
196-
this.firstResult = generator.next();
197-
if (startSetup) {
198-
this.firstResult.then(startSetup).then(resolve, reject);
199-
} else {
200-
this.firstResult.then((_result) => resolve(undefined as S), reject);
201-
}
205+
const storage = AsyncLocalStorageProviderSingleton.getInstance();
206+
void storage.run(params.config, async () => {
207+
this.firstResult = params.generator.next();
208+
if (params.startSetup) {
209+
this.firstResult.then(params.startSetup).then(resolve, reject);
210+
} else {
211+
this.firstResult.then((_result) => resolve(undefined as S), reject);
212+
}
213+
});
202214
});
203215
}
204216

@@ -208,7 +220,10 @@ export class AsyncGeneratorWithSetup<
208220
return this.firstResult;
209221
}
210222

211-
return this.generator.next(...args);
223+
const storage = AsyncLocalStorageProviderSingleton.getInstance();
224+
return storage.run(this.config, async () => {
225+
return this.generator.next(...args);
226+
});
212227
}
213228

214229
async return(
@@ -245,7 +260,7 @@ export async function pipeGeneratorWithSetup<
245260
startSetup: () => Promise<S>,
246261
...args: A
247262
) {
248-
const gen = new AsyncGeneratorWithSetup(generator, startSetup);
263+
const gen = new AsyncGeneratorWithSetup({ generator, startSetup });
249264
const setup = await gen.setup;
250265
return { output: to(gen, setup, ...args), setup };
251266
}

0 commit comments

Comments
 (0)