-
Notifications
You must be signed in to change notification settings - Fork 18
poc: connector builder using concurrent cdk #460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/declarative/schema/default_schema_loader.py:40
- Catching ValueError in addition to OSError could mask schema parsing issues. Consider logging the exception to provide additional context for troubleshooting.
except (OSError, ValueError):
|
||
# this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state | ||
cursor.close_partition(partition) | ||
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential exceptions from cursor.observe(record) separately or moving the call to a finally block to guarantee that cursor.close_partition(partition) is always executed, ensuring proper partition finalization.
# this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state | |
cursor.close_partition(partition) | |
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) |
Copilot uses AI. Check for mistakes.
📝 WalkthroughWalkthroughThe changes restructure the source creation and partition processing flows across the repository. In the connector builder, the source creation now includes additional parameters (catalog and state) with an added concurrency check. In the sources, constants and limits are imported from a shared module, and new classes (e.g., TestLimits, ConcurrentMessageRepository, TestReadSlicerDecorator, and PartitionLogger) are introduced. Several methods have updated signatures and streamlined error/log handling. Test files are updated to align with these changes, including the removal or modification of parameters and test functions. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Builder as ConnectorBuilderHandler
participant CDS as ConcurrentDeclarativeSource
Client->>Builder: Call create_source(config, limits, catalog, state)
Builder->>Builder: _ensure_concurrency_level(manifest)
Builder->>CDS: Instantiate ConcurrentDeclarativeSource(...)
CDS-->>Builder: Return Source Instance
Builder-->>Client: Return Source
sequenceDiagram
participant CS as ConcurrentSource
participant Enqueuer as PartitionEnqueuer
participant Reader as PartitionReader
participant Logger as PartitionLogger
participant Q as Queue
CS->>Enqueuer: Enqueue partitions
Enqueuer->>Reader: Process partition(partition, cursor)
Reader->>Logger: Log partition details
Reader->>Q: Submit processed records/messages
Possibly related PRs
Suggested labels
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
63-66
: Should cursor observation be moved to a finally block?The cursor.close_partition(partition) call might not execute if an exception occurs during record processing. Consider wrapping cursor.observe and cursor.close_partition in a try-finally pattern to ensure proper closing even in error cases, wdyt?
try: for record in partition.read(): self._queue.put(record) cursor.observe(record) + finally: # this assumes the cursor will put a state message on the queue. It also needs to be before the completion sentinel else the concurrent_read_processor might end the sync before consuming the state cursor.close_partition(partition) self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) except Exception as e: self._queue.put(StreamThreadException(e, partition.stream_name())) self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
🧹 Nitpick comments (13)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
622-623
: Consider creating a tracking issue for this FIXME.This FIXME comment indicates a future refactoring to make log_formatter dependent on the logger's debug state. Having a tracking issue would help ensure this doesn't get forgotten.
Also, is there more context on how this would improve the current implementation? I'd be curious to learn more about the rationale, wdyt?
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
2905-2905
: Parameter removal looks good but might need documentation.You've removed the
emit_connector_builder_messages=True
parameter from theModelToComponentFactory
constructor. This change seems intentional and consistent with the other similar changes in this file.Have you considered adding a comment explaining why this parameter is no longer needed? It would help future developers understand the rationale behind this change, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2844-2845
: Implementing slice limiting with TestReadSlicerDecorator.The introduction of the
slice_limit
variable and wrapping thestream_slicer
withTestReadSlicerDecorator
effectively limits the number of slices fetched during test reads. The code uses a reasonable default of 5 slices when no explicit limit is provided.The comment on line 2845 indicates this is a temporary solution until a log formatter is removed. Would it make sense to add a TODO with more details about when/how this will be refactored once the log formatter is removed? wdyt?
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
38-38
: Extended QueueItem type to support AirbyteMessage.Adding
AirbyteMessage
to theQueueItem
union type allows direct passing of protocol messages through the concurrent processing queue. This is a useful extension for the concurrent CDK work, enabling more flexible message handling.I see that this line is getting quite long now. Would you consider formatting this across multiple lines to improve readability? wdyt?
- Record, Partition, PartitionCompleteSentinel, PartitionGenerationCompletedSentinel, Exception, AirbyteMessage, + Record, + Partition, + PartitionCompleteSentinel, + PartitionGenerationCompletedSentinel, + Exception, + AirbyteMessage,🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)
47-70
: Allow configurable queue size?
A default queue size of 10,000 can be large for certain memory scenarios. Would you consider makingmaxsize
configurable so that users can tune the queue size based on their system constraints? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
100-114
: Confirm usage of the same queue across enqueuers/readers
Using a single shared queue forPartitionEnqueuer
andPartitionReader
is straightforward. However, concurrency might increase complexity. Do you have tests ensuring that multiple producers/consumers manage the queue seamlessly under load? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
76-79
: Revisit default limit constants
The defaults (DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
,DEFAULT_MAXIMUM_NUMBER_OF_SLICES
,DEFAULT_MAXIMUM_RECORDS
, andDEFAULT_MAXIMUM_STREAMS
) look practical. Have you considered making them environment-driven if users need to tweak concurrency on the fly? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
82-88
: TestLimits dataclass
Centralizing concurrency constraints insideTestLimits
is convenient. Might it be helpful to document these limits' interplay with the_LOWEST_SAFE_CONCURRENCY_LEVEL
below, so that users know how each limit influences the concurrency pipeline? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
90-105
: ConcurrentMessageRepository concurrency
TheConcurrentMessageRepository
enqueues every emitted message. In very chatty logs or high data volume, have you considered that this could bloat the queue quickly? Maybe add a safeguard or a skip for extremely verbose scenarios? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
426-433
: Refined concurrency checks
This condition checks forPerPartitionWithGlobalCursor
or a decorated slicer. It's good for ML reusability. Would you consider logging which path was taken, to help debug concurrency edge cases? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
unit_tests/connector_builder/test_connector_builder_handler.py (3)
78-78
:_NO_STATE
definition
Setting_NO_STATE = None
is straightforward. Just to confirm: do we rely on_NO_STATE
specifically in any test to skip certain checks, or is it solely an alias for readability? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
910-925
: Creating source withTEST_READ_CONFIG
You injectTEST_READ_CONFIG
and callcreate_source(config, limits, catalog)
. This tie-in ensures incremental sync features are tested. Have you considered what happens if the user modifies_NO_STATE
or passes partial state? Maybe a fallback test scenario could help? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
1093-1094
: Multiple_create_page_response
calls
You’re returning multiple pages in a side effect simulation—great for simulating pages. In the future, might you detail record data differences between pages to confirm correct pagination logic in tests? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
airbyte_cdk/connector_builder/connector_builder_handler.py
(3 hunks)airbyte_cdk/connector_builder/main.py
(1 hunks)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(2 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(8 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(8 hunks)airbyte_cdk/sources/declarative/interpolation/macros.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(1 hunks)airbyte_cdk/sources/declarative/schema/default_schema_loader.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/partition_reader.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/partitions/types.py
(2 hunks)airbyte_cdk/test/entrypoint_wrapper.py
(2 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(10 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(0 hunks)unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
(1 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py
(4 hunks)unit_tests/sources/streams/test_stream_read.py
(0 hunks)
💤 Files with no reviewable changes (2)
- unit_tests/sources/streams/test_stream_read.py
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧬 Code Definitions (9)
airbyte_cdk/connector_builder/main.py (1)
airbyte_cdk/connector_builder/connector_builder_handler.py (1)
create_source
(60-70)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
airbyte_cdk/sources/types.py (1)
StreamSlice
(66-160)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
ModelToComponentFactory
(528-3445)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
PartitionLogger
(19-29)PartitionReader
(31-70)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(81-90)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
TestReadSlicerDecorator
(31-73)airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
streams
(144-169)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
Partition
(11-48)airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
PartitionCompleteSentinel
(15-31)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
TestReadSlicerDecorator
(31-73)airbyte_cdk/sources/types.py (1)
StreamSlice
(66-160)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(81-90)airbyte_cdk/sources/types.py (1)
Record
(20-63)airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
Partition
(11-48)
airbyte_cdk/connector_builder/connector_builder_handler.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
ConcurrentDeclarativeSource
(115-587)TestLimits
(83-87)airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
ManifestDeclarativeSource
(60-442)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
unit_tests/sources/streams/concurrent/test_partition_reader.py
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
airbyte_cdk/sources/streams/concurrent/partition_reader.py
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte_cdk/test/entrypoint_wrapper.py
[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py
airbyte_cdk/sources/streams/concurrent/partitions/types.py
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py
airbyte_cdk/connector_builder/connector_builder_handler.py
[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py
unit_tests/connector_builder/test_connector_builder_handler.py
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (35)
airbyte_cdk/sources/declarative/schema/default_schema_loader.py (1)
40-40
: Expanded exception handling to catch ValueError - nice improvement!Adding
ValueError
to the exception handling will make the schema loading more robust by catching JSON parsing errors in addition to file I/O errors. This ensures we'll still return an empty schema with a helpful log message in more error scenarios.airbyte_cdk/sources/declarative/interpolation/macros.py (1)
180-180
: Good addition to ensure explicit UTC timezone on all datetime objects.This change ensures that regardless of how
dt_datetime
is derived (timestamp, string, or datetime object), it'll explicitly have its timezone set to UTC before being formatted. This prevents potential timezone-related inconsistencies in date formatting and comparisons. It's also consistent with other timezone handling in this file (likenow_utc()
andstr_to_datetime()
).airbyte_cdk/test/entrypoint_wrapper.py (2)
200-200
: Removing hardcoded debug flag to use centralized logic.This change removes the explicit
--debug
flag since it will now be added automatically by the_run_command
function if not present. This aligns with the centralization of debug flag handling you implemented.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py
160-161
:❓ Verification inconclusive
Centralizing debug flag logic - looks good!
Moving the debug flag logic into
_run_command
centralizes this behavior instead of duplicating it across different command functions. This is a cleaner approach that maintains backward compatibility while adding flexibility.There seems to be a linting issue reported in the pipeline failures. Would you like to run the formatter to fix this?
🏁 Script executed:
#!/bin/bash # Verify the code formatting issue black --check airbyte_cdk/test/entrypoint_wrapper.pyLength of output: 150
Centralizing Debug Flag Implementation & Linting Formatter Check
The refactoring to centralize the debug flag logic looks solid and does indeed enhance flexibility while keeping backward compatibility. However, the formatter check ran into an issue because the
black
command wasn’t found in the environment. Could you verify whether we should installblack
in the pipeline (or use an equivalent tool) so that the formatter can run successfully? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/test/entrypoint_wrapper.py
airbyte_cdk/connector_builder/main.py (1)
94-94
: Updated source creation to include catalog and state - good enhancement!This change passes catalog and state information to the
create_source
function, making it consistent with its updated signature inconnector_builder_handler.py
. This gives the connector builder more flexibility by allowing sources to be created with more complete configuration context.The function now matches the implementation in the handler:
def create_source(config: Mapping[str, Any], limits: TestLimits, catalog: Optional[ConfiguredAirbyteCatalog] = None, state: Any = None)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
2993-2993
: Parameter removal looks consistent.This removal of
emit_connector_builder_messages=True
matches your other changes to standardize how the ModelToComponentFactory is instantiated.
3035-3035
: Parameter removal looks consistent.This is the third instance of removing the same parameter, which maintains consistency across the file.
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (2)
6-7
: Import changes look good.The added imports support the new TestReadSlicerDecorator class.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
15-15
: Updated import is appropriate.Expanding the import from airbyte_cdk.sources.types to include StreamSlice and StreamState makes sense given their use in the new code.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
462-462
: New import for test slice limiting functionality.The addition of
TestReadSlicerDecorator
from the stream_slicer module appears to be part of implementing test slice limiting in the declarative framework. This aligns with the POC for connector builder using concurrent CDK mentioned in the PR objectives.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
517-517
: Updated import to include additional type definitions.Expanding the imports from
airbyte_cdk.sources.types
to includeStreamSlice
andStreamState
makes sense given the concurrent CDK work. These types are essential for managing state in partitioned streams.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
7-7
: Added AirbyteMessage to imports.This import supports the extension of the
QueueItem
type to includeAirbyteMessage
, enabling direct message passing through the queue system in the concurrent framework.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partitions/types.py
unit_tests/sources/streams/concurrent/test_partition_reader.py (3)
12-13
: Added Cursor import and updated PartitionReader import.These import changes support the enhanced partition reader functionality that now includes cursor handling and logging. This aligns with the concurrent CDK enhancements for the connector builder POC.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py
30-31
: Added partition logger and cursor mocks.These setup changes create the necessary mock objects to test the updated
PartitionReader
that now accepts a partition logger and uses a cursor for processing partitions. The FIXME comment suggests there's more work needed to properly test the partition logger calls.Should this FIXME be addressed as part of this PR or tracked separately in an issue? It might be important to ensure the partition logger functionality is properly tested.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py
34-34
: Updated process_partition method calls to include cursor parameter.These test modifications correctly update the calls to
process_partition
to include the cursor parameter, aligning with the changes in the implementation. This ensures the tests continue to validate the correct behavior of the partition reader with the concurrent processing enhancements.Also applies to: 45-45, 57-57
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_partition_reader.py
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
178-179
: Updated thread pool submission to include cursor parameter.This change updates the thread pool submission to include the cursor from the respective stream, correctly reflecting the updated signature of the
process_partition
method. This ensures that partition processing in the concurrent framework has access to the cursor for proper state management.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)
7-19
: Imports look fine
Everything here seems aligned with the new concurrency-related classes and type annotations. Nice addition—no concerns to raise.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
80-96
: Consider backpressure handling
Right now, if the queue is full (in scenarios with high throughput), producers may block. Would you consider adding logging or handling for a full queue to avoid potential deadlock or indefinite wait? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
166-167
: Double-check ordering for yieldedAirbyteMessage
When anAirbyteMessage
arrives in the queue, we immediately yield it. Do we need to maintain strict ordering of messages vs. records, or is out-of-order emergence acceptable here? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_source.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
128-149
: Flexible test read constraints
By passinglimits
to theModelToComponentFactory
, you introduce refined control (disabling cache, retries, etc.). Are these changes thoroughly tested for partial updates or more complex scenarios, like streams that might still require some retry logic? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/concurrent_declarative_source.py
unit_tests/connector_builder/test_connector_builder_handler.py (2)
64-64
: New import forStateBuilder
No issues spotted. This looks like a helpful utility for building/managing state.🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
145-157
: Incremental synchronization fields
The manifest now includesincremental_sync
with aDatetimeBasedCursor
. Would you consider adding test coverage for timezone and DST edge cases (e.g., offsets, leaps)? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: unit_tests/connector_builder/test_connector_builder_handler.py
airbyte_cdk/sources/streams/concurrent/partition_reader.py (6)
4-6
: Additional imports added for new functionality.The new imports support the added PartitionLogger class and cursor integration. Good job on properly organizing the imports.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
9-16
: Updated imports for message repository and cursor integration.The changes align with the PR objectives to implement concurrent CDK functionality. The imports are correctly ordered and properly separated.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
19-29
: New PartitionLogger class for consistent slice logging.The PartitionLogger encapsulates the slice logging logic, making it reusable across the codebase. Good separation of concerns!
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
38-43
: Updated constructor to support partition logging.The constructor now accepts an optional partition_logger parameter, allowing for more flexible logging configuration. Nice job making this optional to maintain backward compatibility.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
45-56
: Updated method signature to support cursor integration.The process_partition method now accepts a cursor parameter to track state during partition processing. This aligns with the concurrent processing objectives.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
57-58
: Added partition logging support.Good addition of partition logging with a null check to prevent null pointer exceptions.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/partition_reader.py
airbyte_cdk/connector_builder/connector_builder_handler.py (4)
7-7
: Updated import to include Any for more precise type hints.Good improvement to type annotation by making the imports more specific.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py
18-25
: Now importing constants and classes from concurrent_declarative_source.This change centralizes the constants and classes in one location, improving code maintainability. Great step toward modularity!
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py
52-58
: New function to ensure consistent concurrency level.The
_ensure_concurrency_level
function sets a safe concurrency level with a good comment explaining why the value is below the recommended safe level but still acceptable in this context. Good defensive programming!🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py
60-70
: Updated create_source to support concurrent processing.The function now accepts additional parameters (catalog and state) and uses ConcurrentDeclarativeSource instead of ManifestDeclarativeSource. This enables the concurrent processing feature while ensuring safe concurrency levels through _ensure_concurrency_level.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/connector_builder/connector_builder_handler.py
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (3)
36-40
: Removed slice_logger parameter from constructor.The constructor no longer needs the slice_logger parameter, as this responsibility has been moved to the PartitionReader class. This aligns with the changes in the first file.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
95-95
: Updated thread pool submission to include cursor.The thread pool submission now includes the cursor from the corresponding stream instance, allowing the PartitionReader to track state during partition processing. This is a good change that aligns with the updates to PartitionReader.process_partition.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
108-120
: Simplified error handling in on_partition_complete_sentinel.The error handling has been streamlined while maintaining the core functionality. The code now checks sentinel.is_successful and performs partition cleanup as needed. The flow is more straightforward and easier to follow.
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
class TestReadSlicerDecorator(StreamSlicer): | ||
""" | ||
A stream slicer wrapper for test reads which limits the number of slices produced. | ||
""" | ||
|
||
def __init__(self, stream_slicer: StreamSlicer, maximum_number_of_slices: int) -> None: | ||
self._decorated = stream_slicer | ||
self._maximum_number_of_slices = maximum_number_of_slices | ||
|
||
def stream_slices(self) -> Iterable[StreamSlice]: | ||
return islice(self._decorated.stream_slices(), self._maximum_number_of_slices) | ||
|
||
def get_request_params(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: | ||
return self._decorated.get_request_params( | ||
stream_state=stream_state, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) | ||
|
||
def get_request_headers(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: | ||
return self._decorated.get_request_headers( | ||
stream_state=stream_state, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) | ||
|
||
def get_request_body_data(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]: | ||
return self._decorated.get_request_body_data( | ||
stream_state=stream_state, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) | ||
|
||
def get_request_body_json(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, | ||
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: | ||
return self._decorated.get_request_body_json( | ||
stream_state=stream_state, | ||
stream_slice=stream_slice, | ||
next_page_token=next_page_token, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
New TestReadSlicerDecorator implementation looks good, but formatting needs attention.
The new TestReadSlicerDecorator correctly wraps a StreamSlicer and limits slices during test reads. This appears to replace functionality that was previously in SimpleRetrieverTestReadDecorator.
Note that there's a linting warning about formatting for this file. Perhaps run the formatter on this file before merging?
One minor suggestion: Would it be worth adding validation to ensure maximum_number_of_slices is positive, similar to how SimpleRetrieverTestReadDecorator does it? wdyt?
🏁 Script executed:
#!/bin/bash
# Check if SimpleRetrieverTestReadDecorator validates maximum_number_of_slices
rg -A 5 -B 5 "maximum_number_of_slices" airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Length of output: 794
Fix Required: Address formatting warnings and add positive-value validation
The new TestReadSlicerDecorator implementation works well in wrapping a StreamSlicer and limiting the slices. However, there are a couple of points to consider before merging:
- There's a linting warning concerning formatting in this file. Could you run the formatter to clean up these issues?
- Unlike SimpleRetrieverTestReadDecorator (in airbyte_cdk/sources/declarative/retrievers/simple_retriever.py) which validates that
maximum_number_of_slices
is positive, the new decorator doesn’t have this check. Would it be worth adding a similar validation in the constructor to ensuremaximum_number_of_slices
is strictly positive, to prevent potential issues? wdyt?
🧰 Tools
🪛 GitHub Actions: Linters
[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting on the PoC...
MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice" | ||
MAX_SLICES_KEY = "max_slices" | ||
MAX_RECORDS_KEY = "max_records" | ||
MAX_STREAMS_KEY = "max_streams" | ||
|
||
|
||
@dataclass | ||
class TestLimits: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to move this to avoid circular dependencies. I assume this was caused by concurrent_declarative_source.py
having to know about TestLimits
but connector_builder_handler.py
having to know about concurrent_declarative_source
try: | ||
if sentinel.is_successful: | ||
stream = self._stream_name_to_instance[partition.stream_name()] | ||
stream.cursor.close_partition(partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved stream.cursor.close_partition(partition)
to PartitionReader
which meant that there were no need for catching exceptions here
@@ -44,6 +44,7 @@ def create( | |||
slice_logger: SliceLogger, | |||
message_repository: MessageRepository, | |||
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, | |||
queue: Optional[Queue[QueueItem]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the MessageRepository also needs to have access to the queue, we need to have the queue passed here instead of being created
@@ -360,16 +423,19 @@ def _group_streams( | |||
and incremental_sync_component_definition.get("type", "") | |||
== DatetimeBasedCursorModel.__name__ | |||
and hasattr(declarative_stream.retriever, "stream_slicer") | |||
and isinstance( | |||
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor | |||
and ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the changes in this clause are caused by the TestReadSlicerDecorator
that wraps the "real" partition_router
. It affects how we call create_concurrent_cursor_from_perpartition_cursor
(we need the inner partition router when creating it) and how we create the StreamSlicerPartitionGenerator
as we need to re-wrap the stream_slicer in the TestReadSlicerDecorator
In a world where we don't have declarative cursors, the cursor would already be wrapper and we wouldn't have to do shenanigans here
@@ -177,6 +177,7 @@ def format_datetime( | |||
dt_datetime = ( | |||
datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt) | |||
) | |||
dt_datetime = dt_datetime.replace(tzinfo=pytz.utc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outside of the scope of this PR. See https://airbytehq-team.slack.com/archives/C02U9R3AF37/p1743795253674379 for more details
@@ -140,6 +142,19 @@ | |||
"type": "DeclarativeStream", | |||
"$parameters": _stream_options, | |||
"retriever": "#/definitions/retriever", | |||
"incremental_sync": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to test that the state is properly emitted. I also had to add updated_at
fields in the records to avoid things from breaking
@@ -2902,7 +2902,7 @@ def test_use_request_options_provider_for_datetime_based_cursor(): | |||
parameters={}, | |||
) | |||
|
|||
connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have updated the assertion to check for a assert isinstance(retriever.stream_slicer, TestReadStreamSlicerDecorated)
or disable emit_connector_builder_messages
. I arbitrarily chose the latter
) | ||
assert ( | ||
self._a_closed_partition in handler._streams_to_running_partitions[_ANOTHER_STREAM_NAME] | ||
) | ||
|
||
def test_handle_partition_emits_log_message_if_it_should_be_logged(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slice logging has been moved to PartitionReader
assert self._an_open_partition in handler._streams_to_running_partitions[_STREAM_NAME] | ||
|
||
@freezegun.freeze_time("2020-01-01T00:00:00") | ||
def test_handle_on_partition_complete_sentinel_with_messages_from_repository(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the part of this test that checks if the message repository is still consumed might be still relevant but I assume that the implementation is always ConcurrentMessageRepository
now and MessageRepository.consume_queue
might not be useful in a concurrent world...
What is sure is that self._stream.cursor.close_partition.assert_called_once()
is not useful anymore because it has been moves to the concurrent read processor
self._another_stream.cursor.close_partition.assert_called_once() | ||
|
||
@freezegun.freeze_time("2020-01-01T00:00:00") | ||
def test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was used in case close_partition
was failing. We don't call close_partition
anymore hence this test is useless now
What
https://airbytehq-team.slack.com/archives/C063B9A434H/p1743783912697509
@Anatolii Yatsuk @brian Lai (Airbyte), I was checking what was missing to remove declarative cursors and there is the Connector Builder because we check the state after consuming each slice. I wanted to explore the path of just setting the concurrency to 1 when we do test_reads. However it does not seem like this is possible and I want to knowledge dump this on you too.
So let’s imagine the following scenario:
logs_for_partition_2
) in the message_repository and some records in the queue. At the end, we put a partition completion sentinel in the queueairbyte-python-cdk/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Line 165 in caa1e7d
logs_for_partition_2
here while we’re not done with consuming all the records from the first partition.In other words, there is one problem with the current situation: We don’t know the order or emissions of messages because they go through different queues (concurrent queue vs message repository). I think that once we solve this issue, we will very soon face another one which is that the state message for a partition and the records are emitted in different threads (partition reader thread vs main thread) and we can’t prioritize one over the other to control the order of things (while Java does have a priority system for threads, I don’t think Python has one).
Potential solutions:
MessageRepository.emit_message
push things to the concurrent queue instead of the internal one. My concern with this on is that we were always adamant in the first phase of the development for the Concurrent CDK that the state needed to be emitted on the main thread but I don’t remember the reasons. It feels that if they share different queue, we needed thePartitionCompleteSentinel
to act as a synchronizer to ensure that all the records are being emitted. If state messages go to the same queue though, I don’t see any problem… Maybe @alexandre Girard (Airbyte) could comment on this as he was heavily involved in the initial design. That being said, what I like about this solution is that it seems pretty simple (I intend to do a PoC later today) and would allow us to remove the declarative cursors’ usage from the Connector BuilderAirbyteMessage
debugging_information
to our Record so that it can be added to the AirbyteMessagesBoth solutions are not mutually exclusive and doing the first might simplify the second one because we would not need to have two ways of adding the information in the AirbyteMessages.
Discarded solutions:
emitted_at
in order to reconcile the order: It sounds like a great idea but it doesn’t solve the problem where the records emission for a partition is far from the state emissionThis is the PoC to unify the queue as proposed in solution 1.
test_concurrent_perpartitioncursor.py
are still failingHow
I added some comment to hopefully explain what has been done.
Summary by CodeRabbit
New Features
Refactor