Skip to content

poc: connector builder using concurrent cdk #460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
tmp
  • Loading branch information
maxi297 committed Apr 4, 2025
commit 58016298de3b134cb177ed35a949fa3dcfc7e8ee
20 changes: 17 additions & 3 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
Type,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
Expand Down Expand Up @@ -54,12 +57,23 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits:
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)


def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
def _ensure_concurrency_level(manifest: Dict[str, Any]) -> None:
# We need to do that to ensure that the state in the StreamReadSlices only contains the changes for one slice
# Note that this is below the _LOWEST_SAFE_CONCURRENCY_LEVEL but it is fine in this case because we are limiting the number of slices
# being generated which means that the memory usage is limited anyway
if "concurrency_level" not in manifest:
manifest["concurrency_level"] = {}
manifest["concurrency_level"]["default_concurrency"] = 1

def create_source(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: Any, limits: TestLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
_ensure_concurrency_level(manifest)
return ConcurrentDeclarativeSource(
config=config,
emit_connector_builder_messages=True,
catalog=catalog,
state=state,
source_config=manifest,
emit_connector_builder_messages=True,
component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def handle_connector_builder_request(
def handle_request(args: List[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
source = create_source(config, catalog, state, limits)
return orjson.dumps(
AirbyteMessageSerializer.dump(
handle_connector_builder_request(source, command, config, catalog, state, limits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
)
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
GzipDecoder,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
Expand Down Expand Up @@ -389,10 +388,6 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
COMPONENTS_MODULE_NAME,
SDM_COMPONENTS_MODULE_NAME,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
Expand Down Expand Up @@ -464,6 +459,7 @@
)
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import TestReadSlicerDecorator
from airbyte_cdk.sources.declarative.transformations import (
AddFields,
RecordTransformation,
Expand Down Expand Up @@ -518,7 +514,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand Down Expand Up @@ -2845,6 +2841,8 @@ def create_simple_retriever(
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
slice_limit = self._limit_slices_fetched or 5
stream_slicer = TestReadSlicerDecorator(stream_slicer, slice_limit) # FIXME Once log formatter is removed, we can just pass this to the SimpleRetriever
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
Expand All @@ -2855,7 +2853,6 @@ def create_simple_retriever(
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)

# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
return islice(super().stream_slices(), self.maximum_number_of_slices)

def _fetch_next_page(
self,
stream_state: Mapping[str, Any],
Expand Down Expand Up @@ -623,6 +619,7 @@ def _fetch_next_page(
stream_slice=stream_slice,
next_page_token=next_page_token,
),
# FIXME remove this implementation and have the log_formatter depend on the fact that the logger is debug or not
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to remove the SimpleRetriever decorator eventually. We just need to have the log_formatter be optional be an optional field of the SimpleRetriever and we should be fine.

In theory, this seems good because it would allow for test reads configuration to apply to any retriever, not just the SimpleRetriever

log_formatter=lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
Expand Down
48 changes: 48 additions & 0 deletions airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
#

from abc import ABC
from itertools import islice
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import (
RequestOptionsProvider,
)
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import (
StreamSlicer as ConcurrentStreamSlicer,
)
from airbyte_cdk.sources.types import StreamSlice, StreamState


class StreamSlicer(ConcurrentStreamSlicer, RequestOptionsProvider, ABC):
Expand All @@ -23,3 +26,48 @@ class StreamSlicer(ConcurrentStreamSlicer, RequestOptionsProvider, ABC):
"""

pass


class TestReadSlicerDecorator(StreamSlicer):
"""
A stream slicer wrapper for test reads which limits the number of slices produced.
"""

def __init__(self, stream_slicer: StreamSlicer, maximum_number_of_slices: int) -> None:
self._decorated = stream_slicer
self._maximum_number_of_slices = maximum_number_of_slices

def stream_slices(self) -> Iterable[StreamSlice]:
return islice(self._decorated.stream_slices(), self._maximum_number_of_slices)

def get_request_params(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_body_data(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
return self._decorated.get_request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_body_json(self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
Comment on lines +31 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

New TestReadSlicerDecorator implementation looks good, but formatting needs attention.

The new TestReadSlicerDecorator correctly wraps a StreamSlicer and limits slices during test reads. This appears to replace functionality that was previously in SimpleRetrieverTestReadDecorator.

Note that there's a linting warning about formatting for this file. Perhaps run the formatter on this file before merging?

One minor suggestion: Would it be worth adding validation to ensure maximum_number_of_slices is positive, similar to how SimpleRetrieverTestReadDecorator does it? wdyt?


🏁 Script executed:

#!/bin/bash
# Check if SimpleRetrieverTestReadDecorator validates maximum_number_of_slices
rg -A 5 -B 5 "maximum_number_of_slices" airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Length of output: 794


Fix Required: Address formatting warnings and add positive-value validation

The new TestReadSlicerDecorator implementation works well in wrapping a StreamSlicer and limiting the slices. However, there are a couple of points to consider before merging:

  • There's a linting warning concerning formatting in this file. Could you run the formatter to clean up these issues?
  • Unlike SimpleRetrieverTestReadDecorator (in airbyte_cdk/sources/declarative/retrievers/simple_retriever.py) which validates that maximum_number_of_slices is positive, the new decorator doesn’t have this check. Would it be worth adding a similar validation in the constructor to ensure maximum_number_of_slices is strictly positive, to prevent potential issues? wdyt?
🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py

4 changes: 3 additions & 1 deletion airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def _run_command(
stream_handler.setFormatter(AirbyteLogFormatter())
parent_logger = logging.getLogger("")
parent_logger.addHandler(stream_handler)
if "--debug" not in args:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed for tests to pass. Just enabling more logs in the tests for debugging purposes

args.append("--debug")

parsed_args = AirbyteEntrypoint.parse_args(args)

Expand Down Expand Up @@ -195,7 +197,7 @@ def discover(
config_file = make_file(tmp_directory_path / "config.json", config)

return _run_command(
source, ["discover", "--config", config_file, "--debug"], expecting_exception
source, ["discover", "--config", config_file], expecting_exception
)


Expand Down