Skip to content

feat(cdk): connector builder support for file uploader #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
connector builder: initial changes to pass file reference info to data
  • Loading branch information
aldogonzalez8 committed Apr 23, 2025
commit a6658e6ead091bf2dce5de0a49926c39beacfbd2
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \
ConnectorBuilderFileUploader, BaseFileUploader
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -3592,7 +3593,7 @@ def create_fixed_window_call_rate_policy(

def create_file_uploader(
self, model: FileUploaderModel, config: Config, **kwargs: Any
) -> FileUploader:
) -> BaseFileUploader:
name = "File Uploader"
requester = self._create_component_from_model(
model=model.requester,
Expand All @@ -3606,14 +3607,18 @@ def create_file_uploader(
name=name,
**kwargs,
)
return FileUploader(
emit_connector_builder_messages = self._emit_connector_builder_messages
file_uploader = FileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(),
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
) -> MovingWindowCallRatePolicy:
Expand Down
61 changes: 57 additions & 4 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Mapping, Optional, Union

from abc import ABC, abstractmethod
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
Expand All @@ -24,12 +25,56 @@

logger = logging.getLogger("airbyte")

@dataclass
class BaseFileUploader(ABC):
"""
Base class for file uploader
"""

@abstractmethod
def upload(self, record: Record) -> None:
"""
Uploads the file to the specified location
"""
...

class BaseFileWriter(ABC):
"""
Base File writer class
"""

@abstractmethod
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
...

class FileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
with open(str(file_path), "wb") as f:
f.write(content)

return file_path.stat().st_size

class NoopFileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Noop file writer
"""
return 0

@dataclass
class FileUploader:
class FileUploader(BaseFileUploader):
requester: Requester
download_target_extractor: RecordExtractor
config: Config
file_writer: BaseFileWriter
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Optional[Union[InterpolatedString, str]] = None
Expand Down Expand Up @@ -77,9 +122,7 @@ def upload(self, record: Record) -> None:
full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size
file_size_bytes = self.file_writer.write(full_path, content=response.content)

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
Expand All @@ -91,3 +134,13 @@ def upload(self, record: Record) -> None:
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)


@dataclass
class ConnectorBuilderFileUploader(BaseFileUploader):
file_uploader: FileUploader

def upload(self, record: Record) -> None:
self.file_uploader.upload(record=record)
for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]:
record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute)
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -36,6 +37,7 @@ def __init__(
config=config or {},
state=state or [],
source_config=source_config,
emit_connector_builder_messages=emit_connector_builder_messages
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
Expand Down
49 changes: 47 additions & 2 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import Mock, patch

from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
Expand Down Expand Up @@ -34,6 +34,7 @@ def _source(
config: Dict[str, Any],
state: Optional[List[AirbyteStateMessage]] = None,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> YamlDeclarativeSource:
if not yaml_file:
yaml_file = "file_stream_manifest.yaml"
Expand All @@ -42,6 +43,7 @@ def _source(
catalog=catalog,
config=config,
state=state,
emit_connector_builder_messages=emit_connector_builder_messages
)


Expand All @@ -51,11 +53,12 @@ def read(
state_builder: Optional[StateBuilder] = None,
expecting_exception: bool = False,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> EntrypointOutput:
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
_source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception
)


Expand Down Expand Up @@ -190,6 +193,48 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
)
assert file_reference.file_size_bytes

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
emit_connector_builder_messages=True,
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert file_reference.source_file_relative_path
# because we didn't write the file, the size is 0
assert file_reference.file_size_bytes == 0

# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path
assert record_data["file_size_bytes"] == file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
output = discover(self._config())

Expand Down