Skip to content

feat(cdk): connector builder support for file uploader #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Auto-fix lint and format issues
  • Loading branch information
octavia-squidington-iii authored and aldogonzalez8 committed Apr 23, 2025
commit 5aaa9fcdd3864e0e3dfe9068b030533de6fce4b4
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,13 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \
ConnectorBuilderFileUploader, BaseFileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
BaseFileUploader,
ConnectorBuilderFileUploader,
FileUploader,
FileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -3617,7 +3622,11 @@ def create_file_uploader(
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader
return (
ConnectorBuilderFileUploader(file_uploader)
if emit_connector_builder_messages
else file_uploader
)

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from .file_uploader import FileUploader
from .base_file_uploader import BaseFileUploader
from .base_file_writer import BaseFileWriter
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
from .noop_file_writer import NoopFileWriter
from .file_uploader import FileUploader
from .file_writer import FileWriter
from .noop_file_writer import NoopFileWriter

__all__ = ["FileUploader", "FileWriter", "NoopFileWriter", "ConnectorBuilderFileUploader", "BaseFileUploader", "BaseFileWriter"]
__all__ = [
"FileUploader",
"FileWriter",
"NoopFileWriter",
"ConnectorBuilderFileUploader",
"BaseFileUploader",
"BaseFileWriter",
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass

from abc import ABC, abstractmethod
from airbyte_cdk.sources.declarative.types import Record


Expand All @@ -19,4 +19,4 @@ def upload(self, record: Record) -> None:
"""
Uploads the file to the specified location
"""
...
...
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from pathlib import Path

from abc import ABC, abstractmethod

class BaseFileWriter(ABC):
"""
Expand All @@ -16,4 +16,4 @@ def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
...
...
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ class ConnectorBuilderFileUploader(BaseFileUploader):
Connector builder file uploader
Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
"""

file_uploader: FileUploader

def upload(self, record: Record) -> None:
self.file_uploader.upload(record=record)
for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]:
record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute)
for file_reference_attribute in [
file_reference_attribute
for file_reference_attribute in record.file_reference.__dict__
if not file_reference_attribute.startswith("_")
]:
record.data[file_reference_attribute] = getattr(
record.file_reference, file_reference_attribute
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
logger = logging.getLogger("airbyte")



@dataclass
class FileUploader(BaseFileUploader):
"""
File uploader class
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
"""

requester: Requester
download_target_extractor: RecordExtractor
config: Config
Expand Down Expand Up @@ -99,6 +99,3 @@ def upload(self, record: Record) -> None:
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)



Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@


class FileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
with open(str(file_path), "wb") as f:
f.write(content)

return file_path.stat().st_size
return file_path.stat().st_size
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@


class NoopFileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Noop file writer
"""
return 0
return 0
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
emit_connector_builder_messages: Optional[bool] = False
emit_connector_builder_messages: Optional[bool] = False,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -37,7 +37,7 @@ def __init__(
config=config or {},
state=state or [],
source_config=source_config,
emit_connector_builder_messages=emit_connector_builder_messages
emit_connector_builder_messages=emit_connector_builder_messages,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
Expand Down
16 changes: 11 additions & 5 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _source(
config: Dict[str, Any],
state: Optional[List[AirbyteStateMessage]] = None,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
emit_connector_builder_messages: Optional[bool] = False,
) -> YamlDeclarativeSource:
if not yaml_file:
yaml_file = "file_stream_manifest.yaml"
Expand All @@ -43,7 +43,7 @@ def _source(
catalog=catalog,
config=config,
state=state,
emit_connector_builder_messages=emit_connector_builder_messages
emit_connector_builder_messages=emit_connector_builder_messages,
)


Expand All @@ -53,12 +53,16 @@ def read(
state_builder: Optional[StateBuilder] = None,
expecting_exception: bool = False,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
emit_connector_builder_messages: Optional[bool] = False,
) -> EntrypointOutput:
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception
_source(catalog, config, state, yaml_file, emit_connector_builder_messages),
config,
catalog,
state,
expecting_exception,
)


Expand Down Expand Up @@ -232,7 +236,9 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None:
# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path
assert (
record_data["source_file_relative_path"] == file_reference.source_file_relative_path
)
assert record_data["file_size_bytes"] == file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
Expand Down