Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move async generator consumption code into local storage context
  • Loading branch information
jacoblee93 committed May 17, 2024
commit da54217d44833f1c2bcba7daeb686b5026c40bcd
40 changes: 25 additions & 15 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,11 @@ export abstract class Runnable<
): Promise<IterableReadableStream<RunOutput>> {
// Buffer the first streamed chunk to allow for initial errors
// to surface immediately.
const wrappedGenerator = new AsyncGeneratorWithSetup(
this._streamIterator(input, ensureConfig(options))
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this._streamIterator(input, config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -1945,9 +1947,11 @@ export class RunnableMap<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2151,9 +2155,11 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2458,9 +2464,11 @@ export class RunnableAssign<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2549,9 +2557,11 @@ export class RunnablePick<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down
27 changes: 27 additions & 0 deletions langchain-core/src/singletons/tests/async_local_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,30 @@ test("Config should be automatically populated after setting global async local
events.filter((event) => event.event === "on_llm_start").length
).toEqual(1);
});

test("Runnable streamEvents method with streaming nested in a RunnableLambda", async () => {
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const chat = new FakeListChatModel({
responses: ["Hello"],
});
const myFunc = async (input: string) => {
for await (const _ of await chat.stream(input)) {
}
};

const myNestedLambda = RunnableLambda.from(myFunc);

const events = [];
for await (const event of myNestedLambda.streamEvents("hello", {
version: "v1",
})) {
console.log(event);
events.push(event);
}
const chatModelStreamEvent = events.find((event) => {
return event.event === "on_llm_stream";
});
expect(chatModelStreamEvent).toBeDefined();
});
37 changes: 27 additions & 10 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";

// in this case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
Expand Down Expand Up @@ -182,23 +184,33 @@ export class AsyncGeneratorWithSetup<

public setup: Promise<S>;

public config?: unknown;

private firstResult: Promise<IteratorResult<T>>;

private firstResultUsed = false;

constructor(generator: AsyncGenerator<T>, startSetup?: () => Promise<S>) {
this.generator = generator;
constructor(params: {
generator: AsyncGenerator<T>;
startSetup?: () => Promise<S>;
config?: unknown;
}) {
this.generator = params.generator;
this.config = params.config;
// setup is a promise that resolves only after the first iterator value
// is available. this is useful when setup of several piped generators
// needs to happen in logical order, ie. in the order in which input to
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
this.firstResult = generator.next();
if (startSetup) {
this.firstResult.then(startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
const storage = AsyncLocalStorageProviderSingleton.getInstance();
storage.run(params.config, async () => {
this.firstResult = params.generator.next();
if (params.startSetup) {
this.firstResult.then(params.startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
});
});
}

Expand All @@ -208,7 +220,12 @@ export class AsyncGeneratorWithSetup<
return this.firstResult;
}

return this.generator.next(...args);
const storage = AsyncLocalStorageProviderSingleton.getInstance();
return new Promise((resolve) => {
storage.run(this.config, async () => {
resolve(this.generator.next(...args));
});
});
}

async return(
Expand Down Expand Up @@ -245,7 +262,7 @@ export async function pipeGeneratorWithSetup<
startSetup: () => Promise<S>,
...args: A
) {
const gen = new AsyncGeneratorWithSetup(generator, startSetup);
const gen = new AsyncGeneratorWithSetup({ generator, startSetup });
const setup = await gen.setup;
return { output: to(gen, setup, ...args), setup };
}