-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[kernel-spark] Add rate limiting to getFileChanges() for DSv2 streaming #5361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hi @huan233usc @gengliangwang @tdas @jerrypeng could you please review this PR? |
|
|
||
| /** | ||
| * Interface for files that can be admitted by admission control in Delta streaming sources. | ||
| * This abstraction allows both Scala and Java IndexedFile implementations to be used with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Scala and Java -> v1 and v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| /** | ||
| * Returns the size of the file in bytes. | ||
| * For files without a file action or files with unknown size, returns 0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just make this api can only be called when file action exists?
V1's current impl will just throw NPE if hasFileAction() is false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| @Override | ||
| public long getFileSize() { | ||
| return addFile.getSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: null check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches V1 behavior (throwing an NPE when addFile, removeFile, and cdc are all null).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix that? Are there any places that try to catch NPE? if not we should avoid it for better diagnostic purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Perhaps it's better to throw an IllegalStateException. Thanks!
|
|
||
| override def toString(): String = s"DeltaSource[${deltaLog.dataPath}]" | ||
|
|
||
| trait DeltaSourceAdmissionBase { self: AdmissionLimits => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this refactoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo this is an unnecessary interface that adds cognitive load. but if you'd like to keep refactoring to a minimum, I can also revert this refactor. no strong opinions on this one.
| // the maxFilesPerTrigger conf. | ||
| if (limits.isDefined()) { | ||
| DeltaSource.AdmissionLimits admissionLimits = limits.get(); | ||
| changes = changes.takeWhile(admissionLimits::admit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does takeWhile automatically close the iterator on exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, getFileChangesWithRateLimit() returns a CloseableIterator, the caller should be responsible for catching exceptions and closing the iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In DSv1, we wrap the iterator with "withClose" which will automatically close the iteration if there is an Throwable thrown:
Can we do the same here?
| deltaSource.getFileChangesWithRateLimit( | ||
| /*fromVersion=*/ 0L, | ||
| /* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(), | ||
| /* isInitialSnapshot=*/ false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remember to test it with initialSnapshot :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO.
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
This PR is adds rate limiting to getFileChange() which reads delta metadata to determines what data to process (offset management).
How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No