Skip to content

[Feature Request]: Support Streaming Tool Results (Preliminary Tool Results) #20409

@zxzinn

Description

@zxzinn

Feature Description

Summary

LlamaIndex's FunctionTool and workflow system currently do not support streaming intermediate results from tool execution. When a tool function returns an async generator to yield preliminary/progress updates, the workflow receives the generator object itself rather than consuming it. This limitation prevents real-time progress updates during long-running tool operations.

Current Behavior

When a tool's async_fn returns an async generator:

async def weather_tool(**kwargs):
    yield {'status': 'loading', 'progress': 0}
    yield {'status': 'requesting', 'progress': 30}
    # ... actual API call
    yield {'status': 'success', 'result': data, 'progress': 100}

The FunctionTool.acall() method awaits the function, which returns the generator object:

# In llama_index/core/tools/function_tool.py:350
raw_output = await self._async_fn(*args, **all_kwargs)
# raw_output becomes: <async_generator object weather_tool at 0x...>

This generator object is then stored in ToolOutput.raw_output, and the workflow never iterates through the yielded values.

Expected Behavior

Similar to Vercel AI SDK's preliminary tool results feature:

  1. Detect when a tool function returns an async generator
  2. Iterate through yielded values and emit intermediate ToolCallResult events
  3. Mark intermediate results with a preliminary flag
  4. Emit the final result when the generator completes

Use Cases

  • API Calls: Show "connecting...", "requesting...", "processing..." during external API operations
  • File Processing: Display progress updates when processing large files or datasets
  • Long Computations: Provide incremental feedback during expensive calculations
  • Multi-step Operations: Show which step is currently executing in complex workflows

Comparison with Other Frameworks

Vercel AI SDK (TypeScript)

tool({
  async *execute({ location }) {
    yield { status: 'loading', text: 'Getting weather...' };
    await delay(3000);
    yield { status: 'success', weather: data };
  }
})

Pydantic AI (Python)

# Supports agent.run_stream_events() for streaming all tool events
async for event in agent.run_stream_events():
    if event.type == 'tool_result':
        print(event.data)

LangGraph (Python)

# Supports get_stream_writer for tools to stream updates
writer = get_stream_writer()
await writer.write({'progress': 50})

Proposed Solution

Option 1: Extend FunctionTool.acall()

Detect async generators and iterate through them:

async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput:
    all_kwargs = {**self.partial_params, **kwargs}
    raw_output = await self._async_fn(*args, **all_kwargs)

    # NEW: Check if result is an async generator
    if inspect.isasyncgen(raw_output):
        preliminary_results = []
        final_result = None

        async for result in raw_output:
            # Emit preliminary result somehow?
            preliminary_results.append(result)
            final_result = result

        raw_output = final_result

    # ... rest of existing logic

Challenge: acall() returns a single ToolOutput, but we need to emit multiple events.

Option 2: Add acall_stream() Method

Add a new method that yields multiple ToolOutput objects:

async def acall_stream(self, *args: Any, **kwargs: Any) -> AsyncIterator[ToolOutput]:
    all_kwargs = {**self.partial_params, **kwargs}
    raw_output = await self._async_fn(*args, **all_kwargs)

    if inspect.isasyncgen(raw_output):
        async for result in raw_output:
            is_final = # detect final result somehow
            yield ToolOutput(
                content=str(result),
                tool_name=self.metadata.get_name(),
                raw_output=result,
                is_preliminary=not is_final
            )
    else:
        yield ToolOutput(
            content=str(raw_output),
            tool_name=self.metadata.get_name(),
            raw_output=raw_output
        )

Option 3: Workflow-Level Support

Modify BaseWorkflowAgent._call_tool() to detect and handle async generators:

async def _call_tool(self, ctx: Context, tool: AsyncBaseTool, tool_input: dict) -> ToolOutput:
    tool_output = await tool.acall(**tool_input)

    # Check if raw_output is an async generator
    if inspect.isasyncgen(tool_output.raw_output):
        async for preliminary_result in tool_output.raw_output:
            # Emit ToolCallResult event with preliminary=True
            ctx.write_event_to_stream(
                ToolCallResult(..., preliminary=True)
            )
        # Emit final ToolCallResult

Proposed API

Tool Definition

from llama_index.core.tools import FunctionTool

async def api_tool(location: str):
    """Get weather with progress updates."""
    yield {'status': 'loading', 'progress': 0, 'is_preliminary': True}
    yield {'status': 'requesting', 'progress': 30, 'is_preliminary': True}

    # Actual work
    result = await fetch_weather(location)

    yield {'status': 'success', 'result': result, 'is_preliminary': False}

tool = FunctionTool.from_defaults(
    async_fn=api_tool,
    # ... metadata
)

Workflow Consumption

async for event in workflow.stream_events():
    if isinstance(event, ToolCallResult):
        if event.preliminary:
            print(f"Progress: {event.tool_output.raw_output}")
        else:
            print(f"Final: {event.tool_output.raw_output}")

References

Reason

No response

Value of Feature

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesttriageIssue needs to be triaged/prioritized

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions