Skip to content

Conversation

@zikangh
Copy link
Contributor

@zikangh zikangh commented Oct 17, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR is adds rate limiting to getFileChange() which reads delta metadata to determines what data to process (offset management).

How was this patch tested?

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).

Does this PR introduce any user-facing changes?

No

@zikangh zikangh changed the title minor change Oct 17, 2025
@zikangh zikangh changed the title [WIP] getFileChangesWithRateLimit() Oct 20, 2025
@zikangh zikangh changed the title [WIP] Add rate limiting to getFileChanges() for the dsv2 streaming source Oct 20, 2025
@zikangh
Copy link
Contributor Author

zikangh commented Oct 20, 2025

Hi @huan233usc @gengliangwang @tdas @jerrypeng could you please review this PR?


/**
* Interface for files that can be admitted by admission control in Delta streaming sources.
* This abstraction allows both Scala and Java IndexedFile implementations to be used with
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Scala and Java -> v1 and v2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* Returns the size of the file in bytes.
* For files without a file action or files with unknown size, returns 0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just make this api can only be called when file action exists?

V1's current impl will just throw NPE if hasFileAction() is false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@zikangh zikangh requested a review from huan233usc October 23, 2025 01:33

@Override
public long getFileSize() {
return addFile.getSize();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: null check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches V1 behavior (throwing an NPE when addFile, removeFile, and cdc are all null).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix that? Are there any places that try to catch NPE? if not we should avoid it for better diagnostic purpose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Perhaps it's better to throw an IllegalStateException. Thanks!


override def toString(): String = s"DeltaSource[${deltaLog.dataPath}]"

trait DeltaSourceAdmissionBase { self: AdmissionLimits =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this refactoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo this is an unnecessary interface that adds cognitive load. but if you'd like to keep refactoring to a minimum, I can also revert this refactor. no strong opinions on this one.

// the maxFilesPerTrigger conf.
if (limits.isDefined()) {
DeltaSource.AdmissionLimits admissionLimits = limits.get();
changes = changes.takeWhile(admissionLimits::admit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does takeWhile automatically close the iterator on exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, getFileChangesWithRateLimit() returns a CloseableIterator, the caller should be responsible for catching exceptions and closing the iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DSv1, we wrap the iterator with "withClose" which will automatically close the iteration if there is an Throwable thrown:

https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala#L315

Can we do the same here?

deltaSource.getFileChangesWithRateLimit(
/*fromVersion=*/ 0L,
/* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot=*/ false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to test it with initialSnapshot :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants