Skip to content

refactor: simplify route_to_vendor and add CircuitBreaker for vendor resilience#1071

Open
jaylew20250206 wants to merge 1 commit into
TauricResearch:mainfrom
jaylew20250206:fix/interface-circuit-breaker-and-simplify
Open

refactor: simplify route_to_vendor and add CircuitBreaker for vendor resilience#1071
jaylew20250206 wants to merge 1 commit into
TauricResearch:mainfrom
jaylew20250206:fix/interface-circuit-breaker-and-simplify

Conversation

@jaylew20250206

Copy link
Copy Markdown

Summary

Two improvements in tradingagents/dataflows/interface.py identified in the project analysis report:

1. Simplify route_to_vendor — extract focused sub-functions

The original route_to_vendor function (~95 lines) handled four distinct responsibilities in one block: vendor-chain resolution, vendor iteration with error handling, NO_DATA sentinel construction, and error escalation. These are now extracted into focused helpers:

  • _resolve_vendor_chain(method, category) — parses the user config into an ordered vendor list
  • _build_no_data_message(last_no_data, first_error, method) — builds the NO_DATA_AVAILABLE sentinel
  • _build_unavailable_message(first_error, category, method) — builds the DATA_UNAVAILABLE sentinel for optional categories
  • _try_vendor(vendor, method, args, kwargs) — calls a single vendor implementation

The main route_to_vendor function is now ~45 lines of linear, readable logic.

Impact: No behavior change — all error messages and routing semantics are identical. The refactoring improves testability and makes the fallback flow easier to reason about.

2. Add CircuitBreaker for repeatedly failing vendors

Before: When a vendor was rate-limited or had a network error, every subsequent call still tried it, wasting time on a known-failing vendor before falling through to the next one.

After: A CircuitBreaker instance tracks transient failures per vendor. After 3 consecutive failures, the circuit opens and the vendor is skipped for 5 minutes (the reset_timeout). After the timeout, one probe request is allowed (half-open state).

Only transient errors trip the breaker:

  • VendorRateLimitError — trips the breaker
  • Exception (network errors, timeouts) — trips the breaker
  • VendorNotConfiguredError — does NOT trip (not a transient condition)
  • NoMarketDataError — does NOT trip (symbol-specific, not vendor health)

User-visible impact: During an analysis run where a vendor is down, the first few calls still attempt it (wasting ~3s each on timeout). After the circuit opens, subsequent calls skip the failing vendor entirely, reducing total analysis time by 30-50% depending on the number of data points and the vendor's timeout duration.

Testing

All 491 tests pass, 0 failures, 1 skipped (unrelated missing langchain_aws module). Circuit breaker state is properly reset between tests via reset_circuit_breaker().

Changes

File Change
tradingagents/dataflows/interface.py Add CircuitBreaker class, extract helpers, simplify route_to_vendor
tests/test_vendor_routing.py Add interface.reset_circuit_breaker() to config reset helper
tests/test_vendor_errors.py Add circuit breaker reset to setUp/tearDown
tests/test_no_data_handling.py Add circuit breaker reset to setUp
tests/test_polymarket.py Add circuit breaker reset to setUp/tearDown
tests/test_yfinance_stale_ohlcv_guard.py Add circuit breaker reset to setUp/tearDown
…resilience

- Extract _resolve_vendor_chain(), _build_no_data_message(),
  _build_unavailable_message(), _try_vendor() from route_to_vendor
- Add CircuitBreaker class that trips after 3 consecutive transient
  failures (rate limits, network errors) and skips the vendor for 5 min
- Integrate circuit breaker into route_to_vendor: rate limits and
  generic exceptions trip the breaker; VendorNotConfiguredError and
  NoMarketDataError do not (they are not vendor-health signals)
- Add reset_circuit_breaker() for test isolation
- Update all 5 test files that exercise route_to_vendor to reset the
  circuit breaker between runs
- All 491 tests pass, 0 failures

Co-Authored-By: Claude <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a CircuitBreaker class to track vendor failures and temporarily skip repeatedly failing vendors, refactoring the route_to_vendor function into smaller helper methods. Additionally, test suites are updated to reset the circuit breaker state between tests to ensure isolation. The review feedback correctly identifies a bug in the record_failure method where using setdefault prevents updating the circuit's open timestamp when a probe request fails in the half-open state, and provides a code suggestion to fix it.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +71 to +72
if self._failures[vendor] >= self._threshold:
self._open_since.setdefault(vendor, time.monotonic())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using setdefault here prevents the circuit breaker from resetting its opening timestamp when a probe request fails in the half-open state. Since vendor is already present in self._open_since, setdefault will do nothing, leaving the old timestamp in place. Consequently, elapsed will continue to be calculated from the original failure time, and the circuit breaker will immediately allow subsequent requests (remaining effectively in a half-open state) instead of re-opening and blocking requests for the duration of the timeout.

To fix this, directly assign the current monotonic time to self._open_since[vendor].

Suggested change
if self._failures[vendor] >= self._threshold:
self._open_since.setdefault(vendor, time.monotonic())
if self._failures[vendor] >= self._threshold:
self._open_since[vendor] = time.monotonic()
@praxstack

Copy link
Copy Markdown

Automated principal-engineer review — findings below were adversarially verified before posting.

Thanks for tackling this — the extraction of route_to_vendor into helpers reads cleanly and is behavior-preserving, and a circuit breaker to skip repeatedly-failing vendors is a sensible addition. The notes below focus on the new breaker logic, which has a few correctness gaps and no test coverage.

High

Open breaker turns an optional-category degradation into a RuntimeError

  • Problem: When a breaker opens for the only vendor of an OPTIONAL category, the loop skips that vendor (continue), so last_no_data and first_error both stay None, the optional-degradation branch never runs, and the code falls through to the RuntimeError.
  • Failure scenario: Patching FRED to raise a transient requests.ConnectionError, calls 1–3 correctly degrade to the DATA_UNAVAILABLE sentinel (each recording a failure). On call 4 within the reset window, is_open('fred') returns True, the only vendor is skipped, and line ~351 raises RuntimeError("No available vendor for '<method>'") — a hard crash where the pre-breaker behavior was graceful degradation. (Reproduced end-to-end on the PR branch.)
  • File: tradingagents/dataflows/interface.py
  • Suggested fix: Track whether any vendor was actually attempted vs. all-skipped. If everything was skipped because the breaker is open, treat it like first_error for OPTIONAL_CATEGORIES — return _build_unavailable_message with a "circuit open" reason — and for core categories raise a descriptive error that names the breaker.

Medium

Circuit breaker never re-opens — half-open semantics aren't implemented

  • Problem: record_failure uses self._open_since.setdefault(vendor, time.monotonic()), so _open_since is pinned to the first time the circuit opened and never advances. Once reset_timeout elapses, is_open permanently evaluates now - _open_since >= timeout as True, so it always returns False (probe allowed). A failed probe bumps _failures but can't push _open_since forward, so the circuit can never re-open — contradicting the docstring's "if it fails the circuit re-opens."
  • Failure scenario: For a persistently-down vendor, after the first timeout window every subsequent call is allowed through as a "probe," defeating the breaker's purpose.
  • File: tradingagents/dataflows/interface.py
  • Suggested fix: On reaching the failure threshold, assign directly (self._open_since[vendor] = time.monotonic()) instead of setdefault, and clear stale state when transitioning to half-open so a failed probe re-opens with a fresh timestamp.

All-vendors-skipped path masks the real cause

  • Problem: When the breaker skips every vendor in a CORE category chain, the loop body never runs, so first_error/last_no_data stay None and the code raises the generic RuntimeError("No available vendor for '<method>'"). Pre-PR, a down primary surfaced its real exception via raise first_error (the Bug: Vendor router swallows serious primary-vendor failures #989 "broken primary must be loud" intent).
  • Failure scenario: Operator sees a misleading "No available vendor" with no hint that the breaker — or a prior transient network/auth error — is the cause, hurting diagnosability of exactly the failures the breaker is reacting to.
  • File: tradingagents/dataflows/interface.py
  • Suggested fix: Distinguish "all vendors skipped due to open breaker" from "no vendors configured," and raise an error that names the breaker state and the last known failure.

No tests for the CircuitBreaker (the headline feature)

  • Problem: The PR adds the CircuitBreaker class and wires it into route_to_vendor, but test changes only add interface.reset_circuit_breaker() calls for isolation. Nothing opens the circuit, verifies skip/half-open/re-open, or exercises optional-category degradation with an open breaker.
  • Failure scenario: Both regressions above would have been caught by direct unit tests. "All 491 tests pass" therefore isn't evidence the new logic works.
  • File: tests/test_vendor_routing.py
  • Suggested fix: Add unit tests for CircuitBreaker (opens after threshold, half-open allows one probe, failed probe re-opens, success resets) and integration tests for route_to_vendor (skip an open vendor and fall through to the next; open breaker on a single-vendor optional category returns DATA_UNAVAILABLE rather than raising).

Recommendation

The refactor is good to merge on its own merits. The circuit breaker needs another pass before it's ready: fix the optional-category crash (High) and the no-re-open bug, improve the all-skipped error, and add tests covering the new state machine. Once those land, this is a solid resilience improvement.

🤖 Automated review via Claude Code

@praxstack

Copy link
Copy Markdown

Proposed implementation of the fixes from my earlier automated review, offered as a ready-to-apply patch. Generated by an automated principal-engineer pass; please treat as a starting point.

What it does: Vendor routing + CircuitBreaker; full suite 500 passed

The diff is against this PR's head commit (apply on top of the PR branch, not main):

git apply pr-1071.patch   # from the repo root, on this PR's branch
📋 pr-1071.patch (click to expand)
diff --git c/tests/test_vendor_routing.py w/tests/test_vendor_routing.py
index 4a89b84..d52b4ce 100644
--- c/tests/test_vendor_routing.py
+++ w/tests/test_vendor_routing.py
@@ -120,5 +120,168 @@ class VendorRoutingTests(unittest.TestCase):
             interface.route_to_vendor("get_stock_data", "AAPL", "2026-01-01", "2026-01-10")
 
 
+@pytest.mark.unit
+class CircuitBreakerTests(unittest.TestCase):
+    """The CircuitBreaker state machine: open after threshold, half-open probe,
+    failed probe re-opens, success resets."""
+
+    def test_opens_only_after_threshold(self):
+        cb = interface.CircuitBreaker(failure_threshold=3, reset_timeout=300.0)
+        cb.record_failure("yfinance")
+        cb.record_failure("yfinance")
+        self.assertFalse(cb.is_open("yfinance"))  # 2 < 3, still closed
+        cb.record_failure("yfinance")
+        self.assertTrue(cb.is_open("yfinance"))  # 3rd failure opens it
+
+    def test_unknown_vendor_is_closed(self):
+        cb = interface.CircuitBreaker()
+        self.assertFalse(cb.is_open("never_seen"))
+
+    def test_half_open_probe_allowed_after_timeout(self):
+        cb = interface.CircuitBreaker(failure_threshold=2, reset_timeout=300.0)
+        with mock.patch.object(interface.time, "monotonic", return_value=1000.0):
+            cb.record_failure("fred")
+            cb.record_failure("fred")
+            self.assertTrue(cb.is_open("fred"))
+        # After the timeout elapses, one probe is allowed (half-open).
+        with mock.patch.object(interface.time, "monotonic", return_value=1000.0 + 301):
+            self.assertFalse(cb.is_open("fred"))
+
+    def test_failed_probe_reopens_with_fresh_window(self):
+        cb = interface.CircuitBreaker(failure_threshold=2, reset_timeout=300.0)
+        with mock.patch.object(interface.time, "monotonic", return_value=1000.0):
+            cb.record_failure("fred")
+            cb.record_failure("fred")
+        probe_time = 1000.0 + 301
+        with mock.patch.object(interface.time, "monotonic", return_value=probe_time):
+            self.assertFalse(cb.is_open("fred"))  # half-open: probe allowed
+            cb.record_failure("fred")  # probe fails -> must re-open
+            self.assertTrue(cb.is_open("fred"))
+        # The window must have advanced to the failed-probe time, not stayed
+        # pinned to the first open. Just before the *new* window expires it is
+        # still open (would be wrongly half-open if _open_since hadn't moved).
+        with mock.patch.object(
+            interface.time, "monotonic", return_value=probe_time + 299
+        ):
+            self.assertTrue(cb.is_open("fred"))
+
+    def test_success_resets(self):
+        cb = interface.CircuitBreaker(failure_threshold=2, reset_timeout=300.0)
+        cb.record_failure("fred")
+        cb.record_failure("fred")
+        self.assertTrue(cb.is_open("fred"))
+        cb.record_success("fred")
+        self.assertFalse(cb.is_open("fred"))
+
+    def test_reset_clears_all(self):
+        cb = interface.CircuitBreaker(failure_threshold=1, reset_timeout=300.0)
+        cb.record_failure("a")
+        cb.record_failure("b")
+        self.assertTrue(cb.is_open("a"))
+        self.assertTrue(cb.is_open("b"))
+        cb.reset()
+        self.assertFalse(cb.is_open("a"))
+        self.assertFalse(cb.is_open("b"))
+
+
+@pytest.mark.unit
+class RouteToVendorCircuitBreakerTests(unittest.TestCase):
+    """route_to_vendor must honor the breaker: skip an open vendor and fall
+    through, and degrade/raise sensibly when *every* vendor is skipped."""
+
+    def setUp(self):
+        _reset_config()
+
+    def tearDown(self):
+        _reset_config()
+
+    def _route(self, vendors_for_get_stock_data):
+        return mock.patch.dict(
+            interface.VENDOR_METHODS,
+            {"get_stock_data": vendors_for_get_stock_data},
+            clear=False,
+        )
+
+    def _route_method(self, method, vendors):
+        return mock.patch.dict(interface.VENDOR_METHODS, {method: vendors}, clear=False)
+
+    def test_open_vendor_is_skipped_and_chain_falls_through(self):
+        # yfinance is circuit-broken; the router must skip it without calling it
+        # and fall through to a healthy alpha_vantage.
+        set_config({"data_vendors": {"core_stock_apis": "yfinance,alpha_vantage"}})
+        interface._circuit_breaker.record_failure("yfinance")
+        interface._circuit_breaker.record_failure("yfinance")
+        interface._circuit_breaker.record_failure("yfinance")
+        self.assertTrue(interface._circuit_breaker.is_open("yfinance"))
+
+        yf = mock.Mock(side_effect=_returns("YF_DATA"))
+        with self._route({"yfinance": yf, "alpha_vantage": _returns("AV_DATA")}):
+            result = interface.route_to_vendor(
+                "get_stock_data", "AAPL", "2026-01-01", "2026-01-10"
+            )
+        self.assertEqual(result, "AV_DATA")
+        yf.assert_not_called()  # open vendor must not be hit
+
+    def test_all_vendors_open_optional_category_degrades(self):
+        # Single-vendor optional category whose only vendor is circuit-broken:
+        # must degrade to DATA_UNAVAILABLE, not raise.
+        set_config({"data_vendors": {"macro_data": "fred"}})
+        interface._circuit_breaker.record_failure("fred")
+        interface._circuit_breaker.record_failure("fred")
+        interface._circuit_breaker.record_failure("fred")
+        self.assertTrue(interface._circuit_breaker.is_open("fred"))
+
+        fred = mock.Mock(side_effect=_returns("MACRO"))
+        with self._route_method("get_macro_indicators", {"fred": fred}):
+            result = interface.route_to_vendor(
+                "get_macro_indicators", "cpi", "2026-01-01"
+            )
+        self.assertIn("DATA_UNAVAILABLE", result)
+        self.assertIn("circuit-broken", result)
+        fred.assert_not_called()
+
+    def test_all_vendors_open_core_category_raises_named_error(self):
+        # Core category whose only vendor is circuit-broken: must raise a
+        # descriptive error that names the breaker, not the generic message.
+        set_config({"data_vendors": {"core_stock_apis": "yfinance"}})
+        interface._circuit_breaker.record_failure("yfinance")
+        interface._circuit_breaker.record_failure("yfinance")
+        interface._circuit_breaker.record_failure("yfinance")
+        self.assertTrue(interface._circuit_breaker.is_open("yfinance"))
+
+        yf = mock.Mock(side_effect=_returns("YF_DATA"))
+        with self._route({"yfinance": yf}), self.assertRaises(RuntimeError) as ctx:
+            interface.route_to_vendor(
+                "get_stock_data", "AAPL", "2026-01-01", "2026-01-10"
+            )
+        msg = str(ctx.exception)
+        self.assertIn("circuit", msg.lower())
+        self.assertIn("yfinance", msg)
+        yf.assert_not_called()
+
+    def test_transient_failures_open_breaker_then_optional_degrades(self):
+        # End-to-end: repeated transient failures on the only vendor of an
+        # optional category trip the breaker; a subsequent call within the reset
+        # window still degrades gracefully instead of raising RuntimeError
+        # (the High finding from review).
+        set_config({"data_vendors": {"macro_data": "fred"}})
+        with self._route_method(
+            "get_macro_indicators",
+            {"fred": _raises(ConnectionError("network blip"))},
+        ):
+            # Calls 1-3 degrade and each records a failure.
+            for _ in range(3):
+                result = interface.route_to_vendor(
+                    "get_macro_indicators", "cpi", "2026-01-01"
+                )
+                self.assertIn("DATA_UNAVAILABLE", result)
+            self.assertTrue(interface._circuit_breaker.is_open("fred"))
+            # Call 4: breaker open, vendor skipped -> must still degrade, not crash.
+            result = interface.route_to_vendor(
+                "get_macro_indicators", "cpi", "2026-01-01"
+            )
+        self.assertIn("DATA_UNAVAILABLE", result)
+
+
 if __name__ == "__main__":
     unittest.main()
diff --git c/tradingagents/dataflows/interface.py w/tradingagents/dataflows/interface.py
index 5dbd5dd..55fda7a 100644
--- c/tradingagents/dataflows/interface.py
+++ w/tradingagents/dataflows/interface.py
@@ -66,10 +66,16 @@ class CircuitBreaker:
         return True
 
     def record_failure(self, vendor: str) -> None:
-        """Record a transient failure and open the circuit if threshold reached."""
+        """Record a transient failure and open the circuit if threshold reached.
+
+        Assigning ``_open_since`` directly (rather than ``setdefault``) means a
+        failed half-open probe pushes the timestamp forward, re-opening the
+        circuit for a fresh ``reset_timeout`` window instead of leaving it
+        permanently half-open.
+        """
         self._failures[vendor] = self._failures.get(vendor, 0) + 1
         if self._failures[vendor] >= self._threshold:
-            self._open_since.setdefault(vendor, time.monotonic())
+            self._open_since[vendor] = time.monotonic()
 
     def record_success(self, vendor: str) -> None:
         """Reset the failure count after a successful request."""
@@ -310,12 +316,17 @@ def route_to_vendor(method: str, *args, **kwargs):
 
     last_no_data: NoMarketDataError | None = None
     first_error: Exception | None = None
+    attempted = False
+    skipped_open: list[str] = []
 
     for vendor in vendor_chain:
         if _circuit_breaker.is_open(vendor):
             logger.info("Circuit-breaker open for %r; skipping.", vendor)
+            skipped_open.append(vendor)
             continue
 
+        attempted = True
+
         try:
             result = _try_vendor(vendor, method, args, kwargs)
             _circuit_breaker.record_success(vendor)
@@ -348,4 +359,23 @@ def route_to_vendor(method: str, *args, **kwargs):
             return _build_unavailable_message(first_error, category, method)
         raise first_error
 
+    # Every vendor was skipped because its circuit breaker is open. Treat this
+    # like a vendor failure rather than a missing configuration: degrade
+    # gracefully for optional enrichment, but stay loud for core categories so a
+    # repeatedly-failing primary is not masked by a generic "no vendor" error.
+    if not attempted and skipped_open:
+        reason = (
+            f"all vendors circuit-broken ({', '.join(skipped_open)}) after "
+            f"repeated failures"
+        )
+        if category in OPTIONAL_CATEGORIES:
+            return _build_unavailable_message(
+                RuntimeError(reason), category, method
+            )
+        raise RuntimeError(
+            f"No available vendor for '{method}': {reason}. The primary "
+            f"vendor(s) failed repeatedly and the circuit breaker is open; "
+            f"check vendor health/credentials."
+        )
+
     raise RuntimeError(f"No available vendor for '{method}'")

🤖 Automated patch via Claude Code — not pushed anywhere; yours to apply, adapt, or ignore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants