-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[kernel-spark] Implement latestOffset() with rate limiting for dsv2 streaming #5409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
f602eff to
09ebcbb
Compare
|
Hi @huan233usc, could you please add @huan233usc, @gengliangwang, @jerrypeng, @tdas to the reviewers list? |
| this.streamingHelper = new StreamingHelper(tablePath, hadoopConf); | ||
|
|
||
| // Initialize snapshot at source init to get table ID, similar to DeltaSource.scala | ||
| this.snapshotAtSourceInit = TableManager.loadSnapshot(tablePath).build(engine); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call streamingHelper.loadLatestSnapshot(), underlying it will call TableManager.loadSnapshot(tablePath).build(engine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| @Override | ||
| public Offset latestOffset(Offset startOffset, ReadLimit limit) { | ||
| // For the first batch, initialOffset() should be called before latestOffset(). | ||
| // if startOffset is null: no data is available to read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there by a case that startOffset is set to null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark will call initialOffset first to obtain this startOffset for batch 0 -- so it could be null when initialOffset() returns null (i.e. table has no data).
| // TODO(#5318): Check read-incompatible schema changes during stream start | ||
| IndexedFile lastFile = lastFileChange.get(); | ||
| return ScalaUtils.toJavaOptional( | ||
| DeltaSource.buildOffsetFromIndexedFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeltaSource.buildOffsetFromIndexedFile seems to always return an offset although it returns Option[Offset].
Can we update the signature for DeltaSource.buildOffsetFromIndexedFile? I just want to minimize the interaction with null if it is not really necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
797770c to
8298d06
Compare
8298d06 to
df5e0f1
Compare
acc4fe8 to
8d106d8
Compare
8d106d8 to
029a596
Compare
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
We add implementation for
latestOffset(startOffset, limit)andgetDefaultReadLimit()for a completeSupportsAdmissionControlimplementation.Also refactored a few
DeltaSource.scalamethods -- we make them static so we can call them from SparkMicrobatchStream.java.How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No