Skip to content

Conversation

@zikangh
Copy link
Contributor

@zikangh zikangh commented Oct 28, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

We add implementation for latestOffset(startOffset, limit) and getDefaultReadLimit() for a complete SupportsAdmissionControl implementation.
Also refactored a few DeltaSource.scala methods -- we make them static so we can call them from SparkMicrobatchStream.java.

How was this patch tested?

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).

Does this PR introduce any user-facing changes?

No

@zikangh zikangh changed the title latest snapshot Oct 28, 2025
@zikangh zikangh force-pushed the stack/latestsnapshot2 branch from f602eff to 09ebcbb Compare October 29, 2025 01:00
@zikangh zikangh changed the title [WIP][kernel-spark] Implement latestOffset() with rate limiting for dsv2 streaming Oct 29, 2025
@zikangh zikangh marked this pull request as ready for review October 29, 2025 01:08
@zikangh
Copy link
Contributor Author

zikangh commented Oct 29, 2025

Hi @huan233usc, could you please add @huan233usc, @gengliangwang, @jerrypeng, @tdas to the reviewers list?

this.streamingHelper = new StreamingHelper(tablePath, hadoopConf);

// Initialize snapshot at source init to get table ID, similar to DeltaSource.scala
this.snapshotAtSourceInit = TableManager.loadSnapshot(tablePath).build(engine);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call streamingHelper.loadLatestSnapshot(), underlying it will call TableManager.loadSnapshot(tablePath).build(engine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// For the first batch, initialOffset() should be called before latestOffset().
// if startOffset is null: no data is available to read.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there by a case that startOffset is set to null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will call initialOffset first to obtain this startOffset for batch 0 -- so it could be null when initialOffset() returns null (i.e. table has no data).

// TODO(#5318): Check read-incompatible schema changes during stream start
IndexedFile lastFile = lastFileChange.get();
return ScalaUtils.toJavaOptional(
DeltaSource.buildOffsetFromIndexedFile(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaSource.buildOffsetFromIndexedFile seems to always return an offset although it returns Option[Offset].

Can we update the signature for DeltaSource.buildOffsetFromIndexedFile? I just want to minimize the interaction with null if it is not really necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@zikangh zikangh force-pushed the stack/latestsnapshot2 branch 2 times, most recently from 797770c to 8298d06 Compare October 30, 2025 00:43
@zikangh zikangh force-pushed the stack/latestsnapshot2 branch from 8298d06 to df5e0f1 Compare October 31, 2025 00:09
@zikangh zikangh requested a review from huan233usc October 31, 2025 21:08
@zikangh zikangh force-pushed the stack/latestsnapshot2 branch 2 times, most recently from acc4fe8 to 8d106d8 Compare October 31, 2025 22:28
@zikangh zikangh force-pushed the stack/latestsnapshot2 branch from 8d106d8 to 029a596 Compare October 31, 2025 23:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants