Skip to content

Commit b663cf5

Browse files
committed
poc with test_concurrent_perpartitioncursor still failing
1 parent a56f079 commit b663cf5

File tree

13 files changed

+207
-251
lines changed

13 files changed

+207
-251
lines changed

‎airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
from dataclasses import asdict, dataclass, field
7-
from typing import Any, Dict, List, Mapping
7+
from typing import Any, Dict, List, Mapping, Optional
88

99
from airbyte_cdk.connector_builder.test_reader import TestReader
1010
from airbyte_cdk.models import (
@@ -16,7 +16,12 @@
1616
)
1717
from airbyte_cdk.models import Type as MessageType
1818
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
19+
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE,
20+
DEFAULT_MAXIMUM_NUMBER_OF_SLICES,
21+
DEFAULT_MAXIMUM_RECORDS,
22+
DEFAULT_MAXIMUM_STREAMS,
1923
ConcurrentDeclarativeSource,
24+
TestLimits,
2025
)
2126
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
2227
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
@@ -27,25 +32,12 @@
2732
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
2833
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2934

30-
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
31-
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
32-
DEFAULT_MAXIMUM_RECORDS = 100
33-
DEFAULT_MAXIMUM_STREAMS = 100
34-
3535
MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
3636
MAX_SLICES_KEY = "max_slices"
3737
MAX_RECORDS_KEY = "max_records"
3838
MAX_STREAMS_KEY = "max_streams"
3939

4040

41-
@dataclass
42-
class TestLimits:
43-
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
44-
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
45-
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
46-
max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
47-
48-
4941
def get_limits(config: Mapping[str, Any]) -> TestLimits:
5042
command_config = config.get("__test_read_config", {})
5143
max_pages_per_slice = (
@@ -62,10 +54,10 @@ def _ensure_concurrency_level(manifest: Dict[str, Any]) -> None:
6254
# Note that this is below the _LOWEST_SAFE_CONCURRENCY_LEVEL but it is fine in this case because we are limiting the number of slices
6355
# being generated which means that the memory usage is limited anyway
6456
if "concurrency_level" not in manifest:
65-
manifest["concurrency_level"] = {}
57+
manifest["concurrency_level"] = {"type": "ConcurrencyLevel"}
6658
manifest["concurrency_level"]["default_concurrency"] = 1
6759

68-
def create_source(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: Any, limits: TestLimits) -> ManifestDeclarativeSource:
60+
def create_source(config: Mapping[str, Any], limits: TestLimits, catalog: Optional[ConfiguredAirbyteCatalog] = None, state: Any = None) -> ManifestDeclarativeSource:
6961
manifest = config["__injected_declarative_manifest"]
7062
_ensure_concurrency_level(manifest)
7163
return ConcurrentDeclarativeSource(
@@ -74,13 +66,7 @@ def create_source(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog,
7466
state=state,
7567
source_config=manifest,
7668
emit_connector_builder_messages=True,
77-
component_factory=ModelToComponentFactory(
78-
emit_connector_builder_messages=True,
79-
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
80-
limit_slices_fetched=limits.max_slices,
81-
disable_retries=True,
82-
disable_cache=True,
83-
),
69+
limits=limits,
8470
)
8571

8672

‎airbyte_cdk/connector_builder/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def handle_connector_builder_request(
9191
def handle_request(args: List[str]) -> str:
9292
command, config, catalog, state = get_config_and_catalog_from_args(args)
9393
limits = get_limits(config)
94-
source = create_source(config, catalog, state, limits)
94+
source = create_source(config, limits, catalog, state)
9595
return orjson.dumps(
9696
AirbyteMessageSerializer.dump(
9797
handle_connector_builder_request(source, command, config, catalog, state, limits)

‎airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def __init__(
3434
partition_enqueuer: PartitionEnqueuer,
3535
thread_pool_manager: ThreadPoolManager,
3636
logger: logging.Logger,
37-
slice_logger: SliceLogger,
3837
message_repository: MessageRepository,
3938
partition_reader: PartitionReader,
4039
):
@@ -44,7 +43,6 @@ def __init__(
4443
:param partition_enqueuer: PartitionEnqueuer instance
4544
:param thread_pool_manager: ThreadPoolManager instance
4645
:param logger: Logger instance
47-
:param slice_logger: SliceLogger instance
4846
:param message_repository: MessageRepository instance
4947
:param partition_reader: PartitionReader instance
5048
"""
@@ -59,7 +57,6 @@ def __init__(
5957
self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
6058
self._streams_currently_generating_partitions: List[str] = []
6159
self._logger = logger
62-
self._slice_logger = slice_logger
6360
self._message_repository = message_repository
6461
self._partition_reader = partition_reader
6562
self._streams_done: Set[str] = set()
@@ -95,11 +92,7 @@ def on_partition(self, partition: Partition) -> None:
9592
"""
9693
stream_name = partition.stream_name()
9794
self._streams_to_running_partitions[stream_name].add(partition)
98-
if self._slice_logger.should_log_slice_message(self._logger):
99-
self._message_repository.emit_message(
100-
self._slice_logger.create_slice_log_message(partition.to_slice())
101-
)
102-
self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)
95+
self._thread_pool_manager.submit(self._partition_reader.process_partition, partition, self._stream_name_to_instance[partition.stream_name()].cursor)
10396

10497
def on_partition_complete_sentinel(
10598
self, sentinel: PartitionCompleteSentinel
@@ -112,26 +105,19 @@ def on_partition_complete_sentinel(
112105
"""
113106
partition = sentinel.partition
114107

115-
try:
116-
if sentinel.is_successful:
117-
stream = self._stream_name_to_instance[partition.stream_name()]
118-
stream.cursor.close_partition(partition)
119-
except Exception as exception:
120-
self._flag_exception(partition.stream_name(), exception)
121-
yield AirbyteTracedException.from_exception(
122-
exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
123-
).as_sanitized_airbyte_message()
124-
finally:
125-
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
126-
if partition in partitions_running:
127-
partitions_running.remove(partition)
128-
# If all partitions were generated and this was the last one, the stream is done
129-
if (
130-
partition.stream_name() not in self._streams_currently_generating_partitions
131-
and len(partitions_running) == 0
132-
):
133-
yield from self._on_stream_is_done(partition.stream_name())
134-
yield from self._message_repository.consume_queue()
108+
if sentinel.is_successful:
109+
stream = self._stream_name_to_instance[partition.stream_name()]
110+
111+
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
112+
if partition in partitions_running:
113+
partitions_running.remove(partition)
114+
# If all partitions were generated and this was the last one, the stream is done
115+
if (
116+
partition.stream_name() not in self._streams_currently_generating_partitions
117+
and len(partitions_running) == 0
118+
):
119+
yield from self._on_stream_is_done(partition.stream_name())
120+
yield from self._message_repository.consume_queue()
135121

136122
def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
137123
"""
@@ -160,7 +146,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
160146
stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
161147
)
162148
self._record_counter[stream.name] += 1
163-
stream.cursor.observe(record)
164149
yield message
165150
yield from self._message_repository.consume_queue()
166151

‎airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import concurrent
55
import logging
66
from queue import Queue
7-
from typing import Iterable, Iterator, List
7+
from typing import Iterable, Iterator, List, Optional
88

99
from airbyte_cdk.models import AirbyteMessage
1010
from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
@@ -16,7 +16,7 @@
1616
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
1717
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1818
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
19-
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
19+
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionLogger, PartitionReader
2020
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
2121
from airbyte_cdk.sources.streams.concurrent.partitions.types import (
2222
PartitionCompleteSentinel,
@@ -44,6 +44,7 @@ def create(
4444
slice_logger: SliceLogger,
4545
message_repository: MessageRepository,
4646
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
47+
queue: Optional[Queue[QueueItem]] = None
4748
) -> "ConcurrentSource":
4849
is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
4950
too_many_generator = (
@@ -65,6 +66,7 @@ def create(
6566
message_repository,
6667
initial_number_of_partitions_to_generate,
6768
timeout_seconds,
69+
queue,
6870
)
6971

7072
def __init__(
@@ -75,6 +77,7 @@ def __init__(
7577
message_repository: MessageRepository = InMemoryMessageRepository(),
7678
initial_number_partitions_to_generate: int = 1,
7779
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
80+
queue: Optional[Queue[QueueItem]] = None,
7881
) -> None:
7982
"""
8083
:param threadpool: The threadpool to submit tasks to
@@ -90,6 +93,7 @@ def __init__(
9093
self._message_repository = message_repository
9194
self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
9295
self._timeout_seconds = timeout_seconds
96+
self._queue = queue if queue else Queue(maxsize=10_000)
9397

9498
def read(
9599
self,
@@ -101,23 +105,21 @@ def read(
101105
# threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
102106
# partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
103107
# information and might even need to be configurable depending on the source
104-
queue: Queue[QueueItem] = Queue(maxsize=10_000)
105108
concurrent_stream_processor = ConcurrentReadProcessor(
106109
streams,
107-
PartitionEnqueuer(queue, self._threadpool),
110+
PartitionEnqueuer(self._queue, self._threadpool),
108111
self._threadpool,
109112
self._logger,
110-
self._slice_logger,
111113
self._message_repository,
112-
PartitionReader(queue),
114+
PartitionReader(self._queue, PartitionLogger(self._slice_logger, self._logger, self._message_repository)),
113115
)
114116

115117
# Enqueue initial partition generation tasks
116118
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
117119

118120
# Read from the queue until all partitions were generated and read
119121
yield from self._consume_from_queue(
120-
queue,
122+
self._queue,
121123
concurrent_stream_processor,
122124
)
123125
self._threadpool.check_for_errors_and_shutdown()
@@ -161,5 +163,7 @@ def _handle_item(
161163
yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
162164
elif isinstance(queue_item, Record):
163165
yield from concurrent_stream_processor.on_record(queue_item)
166+
elif isinstance(queue_item, AirbyteMessage):
167+
yield queue_item
164168
else:
165169
raise ValueError(f"Unknown queue item type: {type(queue_item)}")

0 commit comments

Comments
 (0)