-
Notifications
You must be signed in to change notification settings - Fork 18
feat(cdk): handle include_files from configured catalog #512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…file_writer.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
… configured catalog.
…s to always return bool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Functionally, it seems good. I have one design concern. Let me know what you think
@@ -229,7 +230,9 @@ def connection_checker(self) -> ConnectionChecker: | |||
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" | |||
) | |||
|
|||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
def streams( | |||
self, config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit ambivalent on modifying this interface because now, it does not align with the parent interface so it seems like we are breaking compatibility and it'll be hard to maintain with doing isinstance
before calling which means that we will lose most of the benefits of inheritance.
Could we pass the catalog at ModelToComponentFactory.__init__
as a mandatory field? It seems like we are already passing the information here. This would avoid the interface problem, the problem of managing an optional catalog and the problem of passing include_files
through kwargs. We might want to do it here too if it's not too much trouble.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense in order to keep the signature of the interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have applied changes to pass it through ModelToComponentFactory.init
….__init__() so we don't mess the interface signature
…-api/handle-include-files
📝 WalkthroughWalkthroughThe changes introduce a new mechanism for handling file uploads in Airbyte's declarative sources by making the file uploader behavior conditional based on a new Changes
Sequence Diagram(s)sequenceDiagram
participant Source as DeclarativeSource
participant Factory as ModelToComponentFactory
participant Catalog as ConfiguredAirbyteCatalog
participant Stream as DeclarativeStream
participant Uploader as FileUploader
Source->>Factory: Instantiate with catalog
Factory->>Catalog: Build stream name-to-stream mapping
Source->>Factory: create_declarative_stream(stream_model, config)
Factory->>Catalog: Lookup include_files for stream
Factory->>Uploader: create_file_uploader(include_files)
Uploader-->>Stream: Use NoopFileWriter if include_files is False, else LocalFileSystemFileWriter
Stream-->>Source: Stream reads with correct file writer behavior
Would you like me to generate a more detailed sequence diagram focusing on the file uploader selection logic or a comparison of old vs new flows? Wdyt? Possibly related PRs
Suggested reviewers
Does this updated summary and structure look good to you? Would you like me to help draft any additional documentation or usage examples for the new 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (9)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
unit_tests/sources/declarative/file/test_file_stream.py (1)
180-216
: 💡 Verification agent🧩 Analysis chain
Real file writes can leave artefacts on the test runner
test_get_article_attachments_and_file_is_uploaded
exercises the realLocalFileSystemFileWriter
and then checksPath(file_reference.staging_file_url).exists()
.
While this gives strong end-to-end confidence, it also:
- creates files under
/tmp/airbyte-file-transfer
which may persist between test runs, pollute CI caches, or collide when tests run in parallel;- depends on the executing user having write permission to that location.
Could we isolate the side-effect? Two options:
-with HttpMocker() as http_mocker: +with tempfile.TemporaryDirectory() as tmpdir, HttpMocker() as http_mocker, \ + patch("airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.STAGING_DIR", tmpdir):or mock the writer altogether and assert that the intended path would be created, avoiding disk I/O:
-with HttpMocker() as http_mocker: +with HttpMocker() as http_mocker, \ + patch("airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write") as mock_write: + mock_write.return_value = 12345This keeps the test hermetic while preserving its behavioural check—what do you think?
#!/usr/bin/env bash set -e echo "Searching for LocalFileSystemFileWriter definition..." rg "class LocalFileSystemFileWriter" -n . echo echo "Searching for STAGING_DIR constant..." rg "STAGING_DIR" -n .
#!/usr/bin/env bash set -e # Locate the LocalFileSystemFileWriter implementation rg "class LocalFileSystemFileWriter" -n . # Locate the STAGING_DIR constant rg "STAGING_DIR" -n .
Could we verify the locations of
LocalFileSystemFileWriter
and theSTAGING_DIR
constant to ensure our patch targets are correct? wdyt?
🏁 Script executed:
#!/usr/bin/env bash set -e echo "🔍 Searching for LocalFileSystemFileWriter definition..." rg "class LocalFileSystemFileWriter" -n . echo echo "🔍 Searching for STAGING_DIR constant..." rg "STAGING_DIR" -n .Length of output: 757
Let’s peek inside the writer to see how it references the staging directory—so we can patch the right constant. wdyt?
#!/usr/bin/env bash set -e echo "📄 Showing the first 200 lines of LocalFileSystemFileWriter:" sed -n '1,200p' airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py
Could we peek at how the writer picks its staging directory so we can patch the right constant or env var? wdyt?
#!/usr/bin/env bash set -e sed -n '1,200p' airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py
Could we inspect how the writer pulls in the staging directory constant so we know exactly what to patch? wdyt?
#!/usr/bin/env bash set -e echo "🔍 Looking for AIRBYTE_STAGING_DIRECTORY usage in LocalFileSystemFileWriter..." rg "AIRBYTE_STAGING_DIRECTORY" -n airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py || true echo echo "🔍 Searching for any staging-dir imports or references..." rg "files_directory" -n airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py || true
Isolate filesystem side-effects in this test
Real
LocalFileSystemFileWriter
writes to the default staging dir (/staging/files
or/tmp/airbyte-file-transfer
), which can leave artifacts, collide under parallel CI, or require extra permissions. How about we make the test hermetic by either:• Overriding the staging directory constant to a temp folder:
-with HttpMocker() as http_mocker: +with tempfile.TemporaryDirectory() as tmpdir, HttpMocker() as http_mocker, \ + patch("airbyte_cdk.sources.utils.files_directory.AIRBYTE_STAGING_DIRECTORY", tmpdir):• Or mocking the writer’s disk I/O entirely:
-with HttpMocker() as http_mocker: +with HttpMocker() as http_mocker, \ + patch("airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write") as mock_write: + mock_write.return_value = 12345This keeps the behavioral check (we still assert on
file_reference
) without touching the real filesystem. Wdyt?
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3859-3861
: Integration with stream creation processThese lines integrate the
include_files
logic into the file uploader creation during stream setup. The changes respect the PR's goal of controlling file inclusion based on stream configuration.Could this be extended in the future to handle other per-stream configuration settings using a similar pattern? Wdyt?
unit_tests/sources/declarative/file/test_file_stream.py (1)
217-276
: Heavy duplication—parameterise to keep tests tidy?
test_get_article_attachments_with_filename_extractor
(and several siblings) repeat ~40 lines of HTTP stubbing, patch set-ups, and catalog construction that differ only byinclude_files
and expectation flags. That repetition:
- inflates the file to ~500 lines, making maintenance harder;
- risks divergence when one test is updated but its twins are not.
Would a parameterised test using
pytest.mark.parametrize
(or a helper fixture if you must stay inunittest
) reduce noise? Sketch:@parameterized.expand([ (True, True), # include_files, expect_fs_writer (False, False), # ... ]) def test_article_attachments_writer_selection(self, include_files, expect_fs_writer): # shared arrange ... with patch(...) as fs_write, patch(...) as noop_write: ... catalog = CatalogBuilder().with_stream( ConfiguredAirbyteStreamBuilder() .with_name("article_attachments") .with_include_files(include_files) ).build() output = read(self._config(), catalog, yaml_file=...) self.assertEqual(bool(fs_write.called), expect_fs_writer) self.assertEqual(bool(noop_write.called), not expect_fs_writer)Fewer lines, one place to update—wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)airbyte_cdk/test/catalog_builder.py
(1 hunks)unit_tests/sources/declarative/file/test_file_stream.py
(5 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
airbyte_cdk/test/entrypoint_wrapper.py (1)
catalog
(113-117)unit_tests/sources/test_source.py (1)
catalog
(70-93)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
unit_tests/sources/test_source.py (3)
catalog
(70-93)streams
(58-61)streams
(148-150)airbyte_cdk/sources/declarative/declarative_stream.py (2)
name
(93-97)name
(100-102)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (8)
airbyte_cdk/test/catalog_builder.py (1)
44-50
: Nice addition of thewith_include_files
method!This method clearly follows the existing builder pattern and has good documentation. It will support the new feature for handling file inclusion preferences, making it easy to construct test catalogs with the
include_files
flag set.airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
91-91
: LGTM! Catalog parameter is now properly forwarded.The change ensures that the catalog is passed to the component factory, which allows the factory to respect stream-specific
include_files
flags.airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
95-95
: Good addition of the optional catalog parameter.Making this parameter optional preserves backward compatibility while adding support for the new feature. This aligns with the related change in
ConcurrentDeclarativeSource
.
122-122
: Properly forwarding the catalog parameter to the component factory.This ensures consistent behavior between
ManifestDeclarativeSource
andConcurrentDeclarativeSource
for handling file inclusion preferences.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
579-599
: New catalog parameter enables stream configuration lookupThe constructor now accepts a
ConfiguredAirbyteCatalog
parameter and initializes a mapping from stream names to their configured streams. This sets the foundation for supporting per-stream configuration settings such as theinclude_files
flag.
3605-3606
: Added include_files parameter to the file uploader creation methodThe
create_file_uploader
method now accepts aninclude_files
boolean to determine whether files should be included for a stream. This nicely decouples the decision from the implementation details.
3626-3627
: File writer selection now respects include_files flagThis change implements the core functionality - using a
NoopWriter
wheninclude_files
is false, effectively skipping file inclusion while still processing metadata. The approach maintains compatibility with the existing behavior for connector builder mode.
3719-3742
: Added helper methods for catalog-aware stream configurationThe two new methods provide a clean way to:
- Create a lookup map from stream names to configured streams
- Determine whether a specific stream should include files
This encapsulates the logic nicely and keeps it maintainable. The implementation properly handles edge cases like missing streams or unspecified flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/file/test_file_stream.py (1)
123-151
: Good mocking approach for file writers.The test now properly mocks both
NoopFileWriter.write
andLocalFileSystemFileWriter.write
methods to verify that the correct writer is used based on theinclude_files
flag. You're setting a mock file size to simulate the return value from the file system writer.Would adding a docstring to this test method help clarify its purpose, similar to how you've documented the other test methods? Perhaps something like "Test that article attachments can be read and files are included when include_files is true"? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/file/test_file_stream.py
(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (9)
unit_tests/sources/declarative/file/test_file_stream.py (9)
155-159
: Explicit include_files flag in test configuration.This change correctly adds the
with_include_files(True)
to the stream configuration, which is essential for testing the new functionality. This makes it clear that we want files to be included in this test case.
164-167
: Good assertions for file writer usage.These assertions validate that when
include_files
is set toTrue
, only the FileSystemFileWriter is called while the NoopFileWriter is not used. This confirms the core functionality being tested in the PR.
180-216
: LGTM! New test for actual file uploading verification.This test specifically verifies that files are physically written to the staging directory when
include_files=True
, complementing the mock-based tests. The assertion for file existence is a strong verification of the functionality.
217-276
: Well-structured test for filename extractor.You've clearly separated the previously combined test logic to focus on the specific case of using a filename extractor with file inclusion enabled. The assertions verify both that the correct writer is used and that the file reference fields are properly populated.
277-327
: Good test for explicit opt-out of file inclusion.This test thoroughly verifies the behavior when
include_files
is explicitly set toFalse
. The assertions confirm that in this case the NoopFileWriter is used instead of the FileSystemFileWriter, and the file size matches the NoopFileWriter's constant size.
328-373
: Complete coverage with default behavior test.Excellent test that verifies the default behavior when
include_files
is not specified at all. This confirms that the system treats unspecifiedinclude_files
the same as explicitly setting it toFalse
, using the NoopFileWriter as expected.
375-403
: Good mocking in connector builder test.The test now properly mocks both file writers and consistently uses NoopFileWriter's NOOP_FILE_SIZE for the file size. This is consistent with the other tests and makes the expected behavior clear.
418-422
: LGTM! Consistent stream configuration in connector builder test.Adding
with_include_files(True)
to the connector builder test is excellent, as it verifies that even when file inclusion is requested, the connector builder context always uses the NoopFileWriter. This ensures complete coverage of the feature behavior in all contexts.
428-431
: Good assertions for connector builder behavior.These assertions correctly verify that in the connector builder context, only the NoopFileWriter is used regardless of the
include_files
setting. This is important to confirm the expected behavior in this special case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a small nit
@@ -583,6 +583,7 @@ def __init__( | |||
disable_retries: bool = False, | |||
disable_cache: bool = False, | |||
disable_resumable_full_refresh: bool = False, | |||
catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(streams=[]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the default value here? Is there usage of ModelToComponent.__init__
that are public facing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't, I set it as None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we make in mandatory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you say mandatory you mean something like:
class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
def __init__(
self,
catalog: ConfiguredAirbyteCatalog,
...
?
I think yes is possible, it will require us to update in different places like here for which probably we just need to pass a self.catalog that we keep.
Also, some need to update unit test that do things like something = ModelToComponentFactory()
,
What are the advantages we get from doing it mandatory? Or have we observed downsides of the default approach we've doing for other fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whenever we reference catalog
internally, we know it is available and properly populated so easier code to maintain and less cognitive load for the devs.
I agree unit tests are annoying to update. How widespread would be the impact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! As you wish. If you think this is worth, we can fix the test and move forward. Else, I'm fine with the current solution
What
There is a scenario where the user may not want to include files from a file stream, so they will only get metadata. We were not handling this.
How
Read value from the stream configured catalog and check the include_files value, if false/none use NoopWriter, if true, it is fine to use the FileSystemWriter.
This only happens for read operations.
Summary by CodeRabbit