-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Description
Feature Description
Summary
LlamaIndex's FunctionTool and workflow system currently do not support streaming intermediate results from tool execution. When a tool function returns an async generator to yield preliminary/progress updates, the workflow receives the generator object itself rather than consuming it. This limitation prevents real-time progress updates during long-running tool operations.
Current Behavior
When a tool's async_fn returns an async generator:
async def weather_tool(**kwargs):
yield {'status': 'loading', 'progress': 0}
yield {'status': 'requesting', 'progress': 30}
# ... actual API call
yield {'status': 'success', 'result': data, 'progress': 100}The FunctionTool.acall() method awaits the function, which returns the generator object:
# In llama_index/core/tools/function_tool.py:350
raw_output = await self._async_fn(*args, **all_kwargs)
# raw_output becomes: <async_generator object weather_tool at 0x...>This generator object is then stored in ToolOutput.raw_output, and the workflow never iterates through the yielded values.
Expected Behavior
Similar to Vercel AI SDK's preliminary tool results feature:
- Detect when a tool function returns an async generator
- Iterate through yielded values and emit intermediate
ToolCallResultevents - Mark intermediate results with a
preliminaryflag - Emit the final result when the generator completes
Use Cases
- API Calls: Show "connecting...", "requesting...", "processing..." during external API operations
- File Processing: Display progress updates when processing large files or datasets
- Long Computations: Provide incremental feedback during expensive calculations
- Multi-step Operations: Show which step is currently executing in complex workflows
Comparison with Other Frameworks
Vercel AI SDK (TypeScript)
tool({
async *execute({ location }) {
yield { status: 'loading', text: 'Getting weather...' };
await delay(3000);
yield { status: 'success', weather: data };
}
})Pydantic AI (Python)
# Supports agent.run_stream_events() for streaming all tool events
async for event in agent.run_stream_events():
if event.type == 'tool_result':
print(event.data)LangGraph (Python)
# Supports get_stream_writer for tools to stream updates
writer = get_stream_writer()
await writer.write({'progress': 50})Proposed Solution
Option 1: Extend FunctionTool.acall()
Detect async generators and iterate through them:
async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput:
all_kwargs = {**self.partial_params, **kwargs}
raw_output = await self._async_fn(*args, **all_kwargs)
# NEW: Check if result is an async generator
if inspect.isasyncgen(raw_output):
preliminary_results = []
final_result = None
async for result in raw_output:
# Emit preliminary result somehow?
preliminary_results.append(result)
final_result = result
raw_output = final_result
# ... rest of existing logicChallenge: acall() returns a single ToolOutput, but we need to emit multiple events.
Option 2: Add acall_stream() Method
Add a new method that yields multiple ToolOutput objects:
async def acall_stream(self, *args: Any, **kwargs: Any) -> AsyncIterator[ToolOutput]:
all_kwargs = {**self.partial_params, **kwargs}
raw_output = await self._async_fn(*args, **all_kwargs)
if inspect.isasyncgen(raw_output):
async for result in raw_output:
is_final = # detect final result somehow
yield ToolOutput(
content=str(result),
tool_name=self.metadata.get_name(),
raw_output=result,
is_preliminary=not is_final
)
else:
yield ToolOutput(
content=str(raw_output),
tool_name=self.metadata.get_name(),
raw_output=raw_output
)Option 3: Workflow-Level Support
Modify BaseWorkflowAgent._call_tool() to detect and handle async generators:
async def _call_tool(self, ctx: Context, tool: AsyncBaseTool, tool_input: dict) -> ToolOutput:
tool_output = await tool.acall(**tool_input)
# Check if raw_output is an async generator
if inspect.isasyncgen(tool_output.raw_output):
async for preliminary_result in tool_output.raw_output:
# Emit ToolCallResult event with preliminary=True
ctx.write_event_to_stream(
ToolCallResult(..., preliminary=True)
)
# Emit final ToolCallResultProposed API
Tool Definition
from llama_index.core.tools import FunctionTool
async def api_tool(location: str):
"""Get weather with progress updates."""
yield {'status': 'loading', 'progress': 0, 'is_preliminary': True}
yield {'status': 'requesting', 'progress': 30, 'is_preliminary': True}
# Actual work
result = await fetch_weather(location)
yield {'status': 'success', 'result': result, 'is_preliminary': False}
tool = FunctionTool.from_defaults(
async_fn=api_tool,
# ... metadata
)Workflow Consumption
async for event in workflow.stream_events():
if isinstance(event, ToolCallResult):
if event.preliminary:
print(f"Progress: {event.tool_output.raw_output}")
else:
print(f"Final: {event.tool_output.raw_output}")References
Reason
No response
Value of Feature
No response