Skip to content

poc: connector builder using concurrent cdk #460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Apr 7, 2025

What

https://airbytehq-team.slack.com/archives/C063B9A434H/p1743783912697509

@Anatolii Yatsuk @brian Lai (Airbyte), I was checking what was missing to remove declarative cursors and there is the Connector Builder because we check the state after consuming each slice. I wanted to explore the path of just setting the concurrency to 1 when we do test_reads. However it does not seem like this is possible and I want to knowledge dump this on you too.

So let’s imagine the following scenario:

  • The partition generation creates 3 slices. Given we change this log to be part of the partition reader so that they are not emitted when the partition is created but when the read on it starts. Note that at any point in time, Python threading system could have gone back to the main thread to consume the queue where some partitions might be waiting but for simplicity’s sake, let’s assume that it doesn’t do that be all three partitions are generated
  • We go back to the main thread that consume the queues and put all three partition reader from the queue in the thread pool
  • We start consuming from the first partition. This will put some logs in the message_repository and some records in the queue. At the end, we put a partition completion sentinel in the queue
  • This is where the fun starts: We don’t know if Python’s threading system will go back to the main thread or will pick another thread in the thread pool. If it were to go back to the main thread, we would probably be fine: the state would be emitted to the message repository queue, we would emit the records, we would consume the message repository queue (effectively emitting the state) and that’s it! However, let’s assume that Python’s threading system starts another partition reader instead
  • We start consuming from the second partition. This will put some logs like the slice descriptor and the HTTP requests/responses (let’s call them logs_for_partition_2) in the message_repository and some records in the queue. At the end, we put a partition completion sentinel in the queue
  • Now, let’s go back to the main thread. It’ll consume the first thing in the queue. It is record, nice! Let’s emit this record and [let’s emit all the messages in the message repository queue](
    yield from self._message_repository.consume_queue()
    . Oops!! We have emitted the logs_for_partition_2 here while we’re not done with consuming all the records from the first partition.

In other words, there is one problem with the current situation: We don’t know the order or emissions of messages because they go through different queues (concurrent queue vs message repository). I think that once we solve this issue, we will very soon face another one which is that the state message for a partition and the records are emitted in different threads (partition reader thread vs main thread) and we can’t prioritize one over the other to control the order of things (while Java does have a priority system for threads, I don’t think Python has one).

Potential solutions:

  • Unify the queues and move state message emission to partition readers: This means that everything goes through the queue or the message repository. The problem with the message repository is that we need to go back to the main thread from time to time to emit the messages so everything would need to go to the queue basically. In order to do that easily, we could have MessageRepository.emit_message push things to the concurrent queue instead of the internal one. My concern with this on is that we were always adamant in the first phase of the development for the Concurrent CDK that the state needed to be emitted on the main thread but I don’t remember the reasons. It feels that if they share different queue, we needed the PartitionCompleteSentinel to act as a synchronizer to ensure that all the records are being emitted. If state messages go to the same queue though, I don’t see any problem… Maybe @alexandre Girard (Airbyte) could comment on this as he was heavily involved in the initial design. That being said, what I like about this solution is that it seems pretty simple (I intend to do a PoC later today) and would allow us to remove the declarative cursors’ usage from the Connector Builder
  • Add additional logging and modifying the message grouper: Let’s assume that all the messages that we need for the message grouper have additional information about the slice. This would allow the MessageGrouper to simply “group by slice” and that’s it! I like this because it improves our debugging capabilities (we know for which slice a record and a HTTP request/response has been performed when we have more than one thread running concurrently). This would involve:
    • Add a debugging field in the AirbyteMessage
    • Add threading information to our HTTP requests/responses. This seems straightforward as we can add a thread local and at the beginning of threads (PartitionEnqueuer or PartitionReader), we set information in this that the MessageRepository.emit_message
    • For things that are not emitted through the message repository, we need to
      • Add a field debugging_information to our Record so that it can be added to the AirbyteMessages
      • For the state, I’m not sure how to do that because it emitted on the main thread but I’m sure we can pass a flag to the ConcurrentCursor and ConcurrentPerPartitionCursor to add the information when emitting the state. However, this means that this solution need to be re-implemented for all the cursors which is kind of annoying…

Both solutions are not mutually exclusive and doing the first might simplify the second one because we would not need to have two ways of adding the information in the AirbyteMessages.

Discarded solutions:

  • There are no ways in Python to have thread priority which means we can’t force the threading system to go back to the main thread before reading from another partition.
  • Have all the elements in the queue and message repository tagged with emitted_at in order to reconcile the order: It sounds like a great idea but it doesn’t solve the problem where the records emission for a partition is far from the state emission

This is the PoC to unify the queue as proposed in solution 1.

⚠️ Note that test_incremental_substream_error and test_incremental_error from test_concurrent_perpartitioncursor.py are still failing

How

I added some comment to hopefully explain what has been done.

Summary by CodeRabbit

  • New Features

    • Introduced enhanced data synchronization by supporting optional catalog and state parameters.
    • Enabled incremental synchronization with a datetime-based cursor for improved data capture.
  • Refactor

    • Improved concurrency management with automatic default level enforcement.
    • Streamlined slice handling, partition processing, and logging for a more robust and flexible experience.
    • Refined error handling to enhance overall operational reliability.
@Copilot Copilot AI review requested due to automatic review settings April 7, 2025 17:55
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

airbyte_cdk/sources/declarative/schema/default_schema_loader.py:40

  • Catching ValueError in addition to OSError could mask schema parsing issues. Consider logging the exception to provide additional context for troubleshooting.
        except (OSError, ValueError):
Comment on lines +64 to 67

# this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state
cursor.close_partition(partition)
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
Copy link
Preview

Copilot AI Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling potential exceptions from cursor.observe(record) separately or moving the call to a finally block to guarantee that cursor.close_partition(partition) is always executed, ensuring proper partition finalization.

Suggested change
# this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state
cursor.close_partition(partition)
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))

Copilot uses AI. Check for mistakes.

Copy link
Contributor

coderabbitai bot commented Apr 7, 2025

📝 Walkthrough

Walkthrough

The changes restructure the source creation and partition processing flows across the repository. In the connector builder, the source creation now includes additional parameters (catalog and state) with an added concurrency check. In the sources, constants and limits are imported from a shared module, and new classes (e.g., TestLimits, ConcurrentMessageRepository, TestReadSlicerDecorator, and PartitionLogger) are introduced. Several methods have updated signatures and streamlined error/log handling. Test files are updated to align with these changes, including the removal or modification of parameters and test functions.

Changes

File(s) Change Summary
airbyte_cdk/connector_builder/connector_builder_handler.py, airbyte_cdk/connector_builder/main.py Updated create_source to accept additional parameters (catalog, state), added _ensure_concurrency_level, and replaced local constant definitions with imports from the concurrent declarative module.
airbyte_cdk/sources/concurrent_source/.../concurrent_read_processor.py, airbyte_cdk/sources/concurrent_source/.../concurrent_source.py Removed the slice_logger parameter and associated logging from the read processor; added an optional queue parameter and enhanced handling of AirbyteMessage in the concurrent source.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py, airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py, airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Introduced a new TestLimits dataclass and ConcurrentMessageRepository; modified the initialization of ConcurrentDeclarativeSource and wrapped stream slicers with TestReadSlicerDecorator to control slice limits.
airbyte_cdk/sources/declarative/…/stream_slicer.py, airbyte_cdk/sources/declarative/interpolation/macros.py Added the TestReadSlicerDecorator class to limit stream slices and modified format_datetime to explicitly set the UTC timezone.
airbyte_cdk/sources/declarative/schema/default_schema_loader.py Expanded exception handling in get_json_schema to also catch ValueError.
airbyte_cdk/sources/streams/concurrent/…/partition_reader.py, airbyte_cdk/sources/streams/concurrent/partitions/types.py Added a new PartitionLogger and updated PartitionReader to accept a partition_logger and a cursor parameter; extended the QueueItem type alias to include AirbyteMessage.
airbyte_cdk/test/entrypoint_wrapper.py Modified _run_command to conditionally append the --debug flag and removed its explicit inclusion in the discover function.
unit_tests/connector_builder/test_connector_builder_handler.py Updated tests for incremental sync to match the new manifest structure and revised source creation parameters (including a new _NO_STATE variable).
unit_tests/sources/declarative/…, unit_tests/sources/streams/concurrent/…, unit_tests/sources/streams/test_stream_read.py Adjusted tests to reflect updated method signatures and constructor parameters (e.g., removal of slice_logger), and removed tests for deprecated functionality such as slice limiting via the removed test function.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Builder as ConnectorBuilderHandler
  participant CDS as ConcurrentDeclarativeSource
  Client->>Builder: Call create_source(config, limits, catalog, state)
  Builder->>Builder: _ensure_concurrency_level(manifest)
  Builder->>CDS: Instantiate ConcurrentDeclarativeSource(...)
  CDS-->>Builder: Return Source Instance
  Builder-->>Client: Return Source
Loading
sequenceDiagram
  participant CS as ConcurrentSource
  participant Enqueuer as PartitionEnqueuer
  participant Reader as PartitionReader
  participant Logger as PartitionLogger
  participant Q as Queue
  CS->>Enqueuer: Enqueue partitions
  Enqueuer->>Reader: Process partition(partition, cursor)
  Reader->>Logger: Log partition details
  Reader->>Q: Submit processed records/messages
Loading

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • aaronsteers — Would you like to take a look at these changes, aaronsteers?
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)

63-66: Should cursor observation be moved to a finally block?

The cursor.close_partition(partition) call might not execute if an exception occurs during record processing. Consider wrapping cursor.observe and cursor.close_partition in a try-finally pattern to ensure proper closing even in error cases, wdyt?

  try:
      for record in partition.read():
          self._queue.put(record)
          cursor.observe(record)
+  finally:
      # this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state
      cursor.close_partition(partition)
      self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
  except Exception as e:
      self._queue.put(StreamThreadException(e, partition.stream_name()))
      self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py

🧹 Nitpick comments (13)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

622-623: Consider creating a tracking issue for this FIXME.

This FIXME comment indicates a future refactoring to make log_formatter dependent on the logger's debug state. Having a tracking issue would help ensure this doesn't get forgotten.

Also, is there more context on how this would improve the current implementation? I'd be curious to learn more about the rationale, wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

2905-2905: Parameter removal looks good but might need documentation.

You've removed the emit_connector_builder_messages=True parameter from the ModelToComponentFactory constructor. This change seems intentional and consistent with the other similar changes in this file.

Have you considered adding a comment explaining why this parameter is no longer needed? It would help future developers understand the rationale behind this change, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2844-2845: Implementing slice limiting with TestReadSlicerDecorator.

The introduction of the slice_limit variable and wrapping the stream_slicer with TestReadSlicerDecorator effectively limits the number of slices fetched during test reads. The code uses a reasonable default of 5 slices when no explicit limit is provided.

The comment on line 2845 indicates this is a temporary solution until a log formatter is removed. Would it make sense to add a TODO with more details about when/how this will be refactored once the log formatter is removed? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)

38-38: Extended QueueItem type to support AirbyteMessage.

Adding AirbyteMessage to the QueueItem union type allows direct passing of protocol messages through the concurrent processing queue. This is a useful extension for the concurrent CDK work, enabling more flexible message handling.

I see that this line is getting quite long now. Would you consider formatting this across multiple lines to improve readability? wdyt?

-    Record, Partition, PartitionCompleteSentinel, PartitionGenerationCompletedSentinel, Exception, AirbyteMessage,
+    Record,
+    Partition,
+    PartitionCompleteSentinel,
+    PartitionGenerationCompletedSentinel,
+    Exception,
+    AirbyteMessage,
🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py

airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)

47-70: Allow configurable queue size?
A default queue size of 10,000 can be large for certain memory scenarios. Would you consider making maxsize configurable so that users can tune the queue size based on their system constraints? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py


100-114: Confirm usage of the same queue across enqueuers/readers
Using a single shared queue for PartitionEnqueuer and PartitionReader is straightforward. However, concurrency might increase complexity. Do you have tests ensuring that multiple producers/consumers manage the queue seamlessly under load? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)

76-79: Revisit default limit constants
The defaults (DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, DEFAULT_MAXIMUM_NUMBER_OF_SLICES, DEFAULT_MAXIMUM_RECORDS, and DEFAULT_MAXIMUM_STREAMS) look practical. Have you considered making them environment-driven if users need to tweak concurrency on the fly? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py


82-88: TestLimits dataclass
Centralizing concurrency constraints inside TestLimits is convenient. Might it be helpful to document these limits' interplay with the _LOWEST_SAFE_CONCURRENCY_LEVEL below, so that users know how each limit influences the concurrency pipeline? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py


90-105: ConcurrentMessageRepository concurrency
The ConcurrentMessageRepository enqueues every emitted message. In very chatty logs or high data volume, have you considered that this could bloat the queue quickly? Maybe add a safeguard or a skip for extremely verbose scenarios? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py


426-433: Refined concurrency checks
This condition checks for PerPartitionWithGlobalCursor or a decorated slicer. It's good for ML reusability. Would you consider logging which path was taken, to help debug concurrency edge cases? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py

unit_tests/connector_builder/test_connector_builder_handler.py (3)

78-78: _NO_STATE definition
Setting _NO_STATE = None is straightforward. Just to confirm: do we rely on _NO_STATE specifically in any test to skip certain checks, or is it solely an alias for readability? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py


910-925: Creating source with TEST_READ_CONFIG
You inject TEST_READ_CONFIG and call create_source(config, limits, catalog). This tie-in ensures incremental sync features are tested. Have you considered what happens if the user modifies _NO_STATE or passes partial state? Maybe a fallback test scenario could help? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py


1093-1094: Multiple _create_page_response calls
You’re returning multiple pages in a side effect simulation—great for simulating pages. In the future, might you detail record data differences between pages to confirm correct pagination logic in tests? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c32297 and b663cf5.

📒 Files selected for processing (19)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/main.py (1 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (8 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (8 hunks)
  • airbyte_cdk/sources/declarative/interpolation/macros.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1 hunks)
  • airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py (2 hunks)
  • airbyte_cdk/test/entrypoint_wrapper.py (2 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (10 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (0 hunks)
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_partition_reader.py (4 hunks)
  • unit_tests/sources/streams/test_stream_read.py (0 hunks)
💤 Files with no reviewable changes (2)
  • unit_tests/sources/streams/test_stream_read.py
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧬 Code Definitions (9)
airbyte_cdk/connector_builder/main.py (1)
airbyte_cdk/connector_builder/connector_builder_handler.py (1)
  • create_source (60-70)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
airbyte_cdk/sources/types.py (1)
  • StreamSlice (66-160)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (528-3445)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
  • PartitionLogger (19-29)
  • PartitionReader (31-70)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (81-90)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
  • TestReadSlicerDecorator (31-73)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
  • streams (144-169)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
  • PartitionCompleteSentinel (15-31)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
  • TestReadSlicerDecorator (31-73)
airbyte_cdk/sources/types.py (1)
  • StreamSlice (66-160)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (81-90)
airbyte_cdk/sources/types.py (1)
  • Record (20-63)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/connector_builder/connector_builder_handler.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
  • ConcurrentDeclarativeSource (115-587)
  • TestLimits (83-87)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
  • ManifestDeclarativeSource (60-442)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py

[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py

unit_tests/sources/streams/concurrent/test_partition_reader.py

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

airbyte_cdk/sources/concurrent_source/concurrent_source.py

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py

airbyte_cdk/sources/streams/concurrent/partition_reader.py

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

airbyte_cdk/test/entrypoint_wrapper.py

[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py

airbyte_cdk/sources/streams/concurrent/partitions/types.py

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py

airbyte_cdk/connector_builder/connector_builder_handler.py

[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py

unit_tests/connector_builder/test_connector_builder_handler.py

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (35)
airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1)

40-40: Expanded exception handling to catch ValueError - nice improvement!

Adding ValueError to the exception handling will make the schema loading more robust by catching JSON parsing errors in addition to file I/O errors. This ensures we'll still return an empty schema with a helpful log message in more error scenarios.

airbyte_cdk/sources/declarative/interpolation/macros.py (1)

180-180: Good addition to ensure explicit UTC timezone on all datetime objects.

This change ensures that regardless of how dt_datetime is derived (timestamp, string, or datetime object), it'll explicitly have its timezone set to UTC before being formatted. This prevents potential timezone-related inconsistencies in date formatting and comparisons. It's also consistent with other timezone handling in this file (like now_utc() and str_to_datetime()).

airbyte_cdk/test/entrypoint_wrapper.py (2)

200-200: Removing hardcoded debug flag to use centralized logic.

This change removes the explicit --debug flag since it will now be added automatically by the _run_command function if not present. This aligns with the centralization of debug flag handling you implemented.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py


160-161:

❓ Verification inconclusive

Centralizing debug flag logic - looks good!

Moving the debug flag logic into _run_command centralizes this behavior instead of duplicating it across different command functions. This is a cleaner approach that maintains backward compatibility while adding flexibility.

There seems to be a linting issue reported in the pipeline failures. Would you like to run the formatter to fix this?


🏁 Script executed:

#!/bin/bash
# Verify the code formatting issue
black --check airbyte_cdk/test/entrypoint_wrapper.py

Length of output: 150


Centralizing Debug Flag Implementation & Linting Formatter Check

The refactoring to centralize the debug flag logic looks solid and does indeed enhance flexibility while keeping backward compatibility. However, the formatter check ran into an issue because the black command wasn’t found in the environment. Could you verify whether we should install black in the pipeline (or use an equivalent tool) so that the formatter can run successfully? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py

airbyte_cdk/connector_builder/main.py (1)

94-94: Updated source creation to include catalog and state - good enhancement!

This change passes catalog and state information to the create_source function, making it consistent with its updated signature in connector_builder_handler.py. This gives the connector builder more flexibility by allowing sources to be created with more complete configuration context.

The function now matches the implementation in the handler:

def create_source(config: Mapping[str, Any], limits: TestLimits, catalog: Optional[ConfiguredAirbyteCatalog] = None, state: Any = None)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)

2993-2993: Parameter removal looks consistent.

This removal of emit_connector_builder_messages=True matches your other changes to standardize how the ModelToComponentFactory is instantiated.


3035-3035: Parameter removal looks consistent.

This is the third instance of removing the same parameter, which maintains consistency across the file.

airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (2)

6-7: Import changes look good.

The added imports support the new TestReadSlicerDecorator class.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py


15-15: Updated import is appropriate.

Expanding the import from airbyte_cdk.sources.types to include StreamSlice and StreamState makes sense given their use in the new code.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

462-462: New import for test slice limiting functionality.

The addition of TestReadSlicerDecorator from the stream_slicer module appears to be part of implementing test slice limiting in the declarative framework. This aligns with the POC for connector builder using concurrent CDK mentioned in the PR objectives.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py


517-517: Updated import to include additional type definitions.

Expanding the imports from airbyte_cdk.sources.types to include StreamSlice and StreamState makes sense given the concurrent CDK work. These types are essential for managing state in partitioned streams.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)

7-7: Added AirbyteMessage to imports.

This import supports the extension of the QueueItem type to include AirbyteMessage, enabling direct message passing through the queue system in the concurrent framework.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py

unit_tests/sources/streams/concurrent/test_partition_reader.py (3)

12-13: Added Cursor import and updated PartitionReader import.

These import changes support the enhanced partition reader functionality that now includes cursor handling and logging. This aligns with the concurrent CDK enhancements for the connector builder POC.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py


30-31: Added partition logger and cursor mocks.

These setup changes create the necessary mock objects to test the updated PartitionReader that now accepts a partition logger and uses a cursor for processing partitions. The FIXME comment suggests there's more work needed to properly test the partition logger calls.

Should this FIXME be addressed as part of this PR or tracked separately in an issue? It might be important to ensure the partition logger functionality is properly tested.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py


34-34: Updated process_partition method calls to include cursor parameter.

These test modifications correctly update the calls to process_partition to include the cursor parameter, aligning with the changes in the implementation. This ensures the tests continue to validate the correct behavior of the partition reader with the concurrent processing enhancements.

Also applies to: 45-45, 57-57

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)

178-179: Updated thread pool submission to include cursor parameter.

This change updates the thread pool submission to include the cursor from the respective stream, correctly reflecting the updated signature of the process_partition method. This ensures that partition processing in the concurrent framework has access to the cursor for proper state management.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)

7-19: Imports look fine
Everything here seems aligned with the new concurrency-related classes and type annotations. Nice addition—no concerns to raise.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py


80-96: Consider backpressure handling
Right now, if the queue is full (in scenarios with high throughput), producers may block. Would you consider adding logging or handling for a full queue to avoid potential deadlock or indefinite wait? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py


166-167: Double-check ordering for yielded AirbyteMessage
When an AirbyteMessage arrives in the queue, we immediately yield it. Do we need to maintain strict ordering of messages vs. records, or is out-of-order emergence acceptable here? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

128-149: Flexible test read constraints
By passing limits to the ModelToComponentFactory, you introduce refined control (disabling cache, retries, etc.). Are these changes thoroughly tested for partial updates or more complex scenarios, like streams that might still require some retry logic? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py

unit_tests/connector_builder/test_connector_builder_handler.py (2)

64-64: New import for StateBuilder
No issues spotted. This looks like a helpful utility for building/managing state.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py


145-157: Incremental synchronization fields
The manifest now includes incremental_sync with a DatetimeBasedCursor. Would you consider adding test coverage for timezone and DST edge cases (e.g., offsets, leaps)? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py

airbyte_cdk/sources/streams/concurrent/partition_reader.py (6)

4-6: Additional imports added for new functionality.

The new imports support the added PartitionLogger class and cursor integration. Good job on properly organizing the imports.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py


9-16: Updated imports for message repository and cursor integration.

The changes align with the PR objectives to implement concurrent CDK functionality. The imports are correctly ordered and properly separated.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py


19-29: New PartitionLogger class for consistent slice logging.

The PartitionLogger encapsulates the slice logging logic, making it reusable across the codebase. Good separation of concerns!

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py


38-43: Updated constructor to support partition logging.

The constructor now accepts an optional partition_logger parameter, allowing for more flexible logging configuration. Nice job making this optional to maintain backward compatibility.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py


45-56: Updated method signature to support cursor integration.

The process_partition method now accepts a cursor parameter to track state during partition processing. This aligns with the concurrent processing objectives.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py


57-58: Added partition logging support.

Good addition of partition logging with a null check to prevent null pointer exceptions.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py

airbyte_cdk/connector_builder/connector_builder_handler.py (4)

7-7: Updated import to include Any for more precise type hints.

Good improvement to type annotation by making the imports more specific.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py


18-25: Now importing constants and classes from concurrent_declarative_source.

This change centralizes the constants and classes in one location, improving code maintainability. Great step toward modularity!

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py


52-58: New function to ensure consistent concurrency level.

The _ensure_concurrency_level function sets a safe concurrency level with a good comment explaining why the value is below the recommended safe level but still acceptable in this context. Good defensive programming!

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py


60-70: Updated create_source to support concurrent processing.

The function now accepts additional parameters (catalog and state) and uses ConcurrentDeclarativeSource instead of ManifestDeclarativeSource. This enables the concurrent processing feature while ensuring safe concurrency levels through _ensure_concurrency_level.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (3)

36-40: Removed slice_logger parameter from constructor.

The constructor no longer needs the slice_logger parameter, as this responsibility has been moved to the PartitionReader class. This aligns with the changes in the first file.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py


95-95: Updated thread pool submission to include cursor.

The thread pool submission now includes the cursor from the corresponding stream instance, allowing the PartitionReader to track state during partition processing. This is a good change that aligns with the updates to PartitionReader.process_partition.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py


108-120: Simplified error handling in on_partition_complete_sentinel.

The error handling has been streamlined while maintaining the core functionality. The code now checks sentinel.is_successful and performs partition cleanup as needed. The flow is more straightforward and easier to follow.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Comment on lines +31 to +73
class TestReadSlicerDecorator(StreamSlicer):
"""
A stream slicer wrapper for test reads which limits the number of slices produced.
"""

def __init__(self, stream_slicer: StreamSlicer, maximum_number_of_slices: int) -> None:
self._decorated = stream_slicer
self._maximum_number_of_slices = maximum_number_of_slices

def stream_slices(self) -> Iterable[StreamSlice]:
return islice(self._decorated.stream_slices(), self._maximum_number_of_slices)

def get_request_params(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_body_data(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
return self._decorated.get_request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_body_json(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

New TestReadSlicerDecorator implementation looks good, but formatting needs attention.

The new TestReadSlicerDecorator correctly wraps a StreamSlicer and limits slices during test reads. This appears to replace functionality that was previously in SimpleRetrieverTestReadDecorator.

Note that there's a linting warning about formatting for this file. Perhaps run the formatter on this file before merging?

One minor suggestion: Would it be worth adding validation to ensure maximum_number_of_slices is positive, similar to how SimpleRetrieverTestReadDecorator does it? wdyt?


🏁 Script executed:

#!/bin/bash
# Check if SimpleRetrieverTestReadDecorator validates maximum_number_of_slices
rg -A 5 -B 5 "maximum_number_of_slices" airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Length of output: 794


Fix Required: Address formatting warnings and add positive-value validation

The new TestReadSlicerDecorator implementation works well in wrapping a StreamSlicer and limiting the slices. However, there are a couple of points to consider before merging:

  • There's a linting warning concerning formatting in this file. Could you run the formatter to clean up these issues?
  • Unlike SimpleRetrieverTestReadDecorator (in airbyte_cdk/sources/declarative/retrievers/simple_retriever.py) which validates that maximum_number_of_slices is positive, the new decorator doesn’t have this check. Would it be worth adding a similar validation in the constructor to ensure maximum_number_of_slices is strictly positive, to prevent potential issues? wdyt?
🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py

Copy link
Contributor Author

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting on the PoC...

MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
MAX_SLICES_KEY = "max_slices"
MAX_RECORDS_KEY = "max_records"
MAX_STREAMS_KEY = "max_streams"


@dataclass
class TestLimits:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move this to avoid circular dependencies. I assume this was caused by concurrent_declarative_source.py having to know about TestLimits but connector_builder_handler.py having to know about concurrent_declarative_source

try:
if sentinel.is_successful:
stream = self._stream_name_to_instance[partition.stream_name()]
stream.cursor.close_partition(partition)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved stream.cursor.close_partition(partition) to PartitionReader which meant that there were no need for catching exceptions here

@@ -44,6 +44,7 @@ def create(
slice_logger: SliceLogger,
message_repository: MessageRepository,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
queue: Optional[Queue[QueueItem]] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the MessageRepository also needs to have access to the queue, we need to have the queue passed here instead of being created

@@ -360,16 +423,19 @@ def _group_streams(
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
and (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the changes in this clause are caused by the TestReadSlicerDecorator that wraps the "real" partition_router. It affects how we call create_concurrent_cursor_from_perpartition_cursor (we need the inner partition router when creating it) and how we create the StreamSlicerPartitionGenerator as we need to re-wrap the stream_slicer in the TestReadSlicerDecorator

In a world where we don't have declarative cursors, the cursor would already be wrapper and we wouldn't have to do shenanigans here

@@ -177,6 +177,7 @@ def format_datetime(
dt_datetime = (
datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt)
)
dt_datetime = dt_datetime.replace(tzinfo=pytz.utc)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside of the scope of this PR. See https://airbytehq-team.slack.com/archives/C02U9R3AF37/p1743795253674379 for more details

@@ -140,6 +142,19 @@
"type": "DeclarativeStream",
"$parameters": _stream_options,
"retriever": "#/definitions/retriever",
"incremental_sync": {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to test that the state is properly emitted. I also had to add updated_at fields in the records to avoid things from breaking

@@ -2902,7 +2902,7 @@ def test_use_request_options_provider_for_datetime_based_cursor():
parameters={},
)

connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have updated the assertion to check for a assert isinstance(retriever.stream_slicer, TestReadStreamSlicerDecorated) or disable emit_connector_builder_messages. I arbitrarily chose the latter

)
assert (
self._a_closed_partition in handler._streams_to_running_partitions[_ANOTHER_STREAM_NAME]
)

def test_handle_partition_emits_log_message_if_it_should_be_logged(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice logging has been moved to PartitionReader

assert self._an_open_partition in handler._streams_to_running_partitions[_STREAM_NAME]

@freezegun.freeze_time("2020-01-01T00:00:00")
def test_handle_on_partition_complete_sentinel_with_messages_from_repository(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the part of this test that checks if the message repository is still consumed might be still relevant but I assume that the implementation is always ConcurrentMessageRepository now and MessageRepository.consume_queue might not be useful in a concurrent world...

What is sure is that self._stream.cursor.close_partition.assert_called_once() is not useful anymore because it has been moves to the concurrent read processor

self._another_stream.cursor.close_partition.assert_called_once()

@freezegun.freeze_time("2020-01-01T00:00:00")
def test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was used in case close_partition was failing. We don't call close_partition anymore hence this test is useless now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
1 participant