-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Description
Description
When a task execution times out in Agent._execute_with_timeout(), the code calls future.cancel(), which only works if the task hasn't started. If the task is already running, cancel() returns False and the task continues. The ThreadPoolExecutor context manager then exits with shutdown(wait=False), which shuts down the executor while the task thread keeps running, creating an orphaned thread.
This causes:
Thread leaks: orphaned threads accumulate under load
Resource exhaustion: connections, memory, and file handles leak
Production crashes: OOM or connection pool exhaustion
Potential state corruption: tasks continue executing after timeout
Location: lib/crewai/src/crewai/agent/core.py, method _execute_with_timeout() (lines 439-470)
Steps to Reproduce
- Create an agent with a short max_execution_time:
from crewai import Agent, Task, Crew
agent = Agent(
role="Researcher",
goal="Perform research",
backstory="You are a researcher",
max_execution_time=5 # 5 seconds timeout
)
task = Task(
description="Perform a long-running research task that will exceed 5 seconds",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
- Execute the crew with a task that will exceed the timeout:
result = crew.kickoff()
- Monitor thread count and memory usage:
import threading
import psutil
import os
# Before execution
initial_threads = threading.active_count()
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Execute multiple times
for i in range(10):
try:
crew.kickoff()
except Exception:
pass
# After execution
final_threads = threading.active_count()
final_memory = process.memory_info().rss
print(f"Threads: {initial_threads} -> {final_threads}")
print(f"Memory: {initial_memory / 1024 / 1024:.2f} MB -> {final_memory / 1024 / 1024:.2f} MB")
- Observe:
Thread count increases with each timeout
Memory usage grows
Under high load, the process may crash due to resource exhaustion
Expected behavior
When a task times out:
The timeout exception should be raised
The executor should wait for the running task to complete (or gracefully terminate)
All threads should be cleaned up
No resource leaks should occur
Subsequent executions should not accumulate orphaned threads
Screenshots/Code snippets
Before Fix:
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
self._execute_without_timeout, task_prompt=task_prompt, task=task
)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError as e:
future.cancel() # ❌ This fails if task is already running
raise TimeoutError(...) from e
except Exception as e:
future.cancel() # ❌ This fails if task is already running
raise RuntimeError(...) from e
# ❌ Context manager exits with shutdown(wait=False), leaving orphaned threads
After Fix:
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor()
try:
future = executor.submit(
self._execute_without_timeout, task_prompt=task_prompt, task=task
)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError as e:
cancelled = future.cancel()
if not cancelled:
# Task is already running - wait briefly for cleanup
try:
future.result(timeout=1.0)
except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError):
pass
raise TimeoutError(...) from e
except Exception as e:
cancelled = future.cancel()
if not cancelled:
try:
future.result(timeout=0.1)
except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError):
pass
raise RuntimeError(...) from e
finally:
# ✅ Always wait for threads to complete, preventing orphaned threads
executor.shutdown(wait=True)
Operating System
macOS Big Sur
Python Version
3.10
crewAI Version
Affects all versions using Agent._execute_with_timeout() Fixed in: (version after this fix is merged)
crewAI Tools Version
N/A (bug is in core crewAI library)
Virtual Environment
Venv
Evidence
Root Cause Analysis
The bug occurs because:
future.cancel() only works if the task hasn't started executing:
From Python's concurrent.futures documentation: # cancel(): Attempts to cancel the call. If the call is currently being executed # and cannot be cancelled, the method will return False.
When cancel() returns False, the task continues running in the background
The context manager with ThreadPoolExecutor() calls shutdown(wait=False) on exit:
Default behavior: shutdown(wait=False) doesn't wait for running tasks
Result: The executor shuts down, but the task thread continues running, becoming orphaned
Production Impact
Symptoms observed:
Increasing thread count over time (not garbage collected)
Memory leaks (each thread holds references to LLM clients, DB connections, etc.)
Connection pool exhaustion (database connections not released)
Process crashes under load (OOM or resource limits)
Potential state corruption (tasks modifying shared state after "timeout")
Possible Solution
Fix the timeout handling in Agent._execute_with_timeout() to ensure threads are cleaned up even when future.cancel() fails.
Additional context
- Similar Pattern in Other Files
The same pattern may exist elsewhere. Check these locations:
lib/crewai/src/crewai/tools/mcp_native_tool.py (line 94):
Status: Likely safe — waits for future.result() before context exit
Recommendation: Review to ensure no timeout scenarios
lib/crewai/src/crewai/agent/core.py (line 963):
Status: Likely safe — waits for result
Recommendation: Review for consistency