Daft checkpoint design #5868
Replies: 5 comments 10 replies
-
|
Looking forward to discussing our checkpoint implementation with the community! We’ve built v1 of the checkpoint and would love to contribute! |
Beta Was this translation helpful? Give feedback.
-
|
Great work on the checkpoint proposal! This is a solid step forward for Daft's robustness. Excited to see this feature take shape! I have a few questions about this:
|
Beta Was this translation helpful? Give feedback.
-
Is there a mistake here? Should it be |
Beta Was this translation helpful? Give feedback.
-
Why is it necessary to add some |
Beta Was this translation helpful? Give feedback.
-
|
If needed later, I would be glad to participate in co-construction. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Context
A long-running job may fail and terminate due to various reasons (such as resource limitations, unstable environment, code bugs, etc.). Once a failure occurs during the intermediate process, restarting often means the entire workflow runs from the beginning, which leads to the already processed data being re-executed. This redundant computation is a huge waste of resources and time.
Therefore, we propose a design of checkpoint: to implement "incremental processing". For example, if the previous run terminates after processing and writing part of the data, subsequent runs will skip already processed data and only complete the missing part.
Design
The checkpoint in Daft enables incremental processing. Its core principle is using primary key (or composite primary key) to filter out rows that have already been processed, ensuring that only new data is processed and appended to target path.
This is achieved by injecting a filter predicate into the logical plan, immediately after the source node. When a write operation is initiated with a checkpoint_config, Daft first reads the primary keys from the existing data at the destination. This set of primary keys is then loaded into memory, distributed across a pool of checkpoint actors. During execution, the injected filter (actually is a UDF Actor) consults these actors to efficiently discard rows with primary keys that already exist. The DataFrame.write_* APIs should have been extended to accept checkpoint_config as a parameter, which controls this behavior mentioned above.
Planning
Milestone 1: Checkpointing for major and basic scenarios
Status: ✅ Completed
Tasks:
checkpoint_configparameter: must be a dictionary containing:- key_column: The name of the column(s) to use as the primary key/composite primary keys.
- num_buckets(optional): The number of checkpoint actors to create for sharding the primary keys set.
- num_cpus(optional): The number of CPUs to allocate for each checkpoint actor.
- batch_size(optional): The batch size of checkpoint filter operation.
Limits in Milestone 1:
Milestone 2: Checkpoint Enhancement.
Status: ⌛️ In Progress
Tasks:
Limits in Milestone 2:
Milestone 3: Checkpoint available for all formats, like Flink CDC
Today: an actor-based filter delivers incremental processes without external state.
Long term: we could consider a stateful checkpointing mode inspired by Flink CDC/checkpoints
Tasks:
Limits in Milestone 3:
Benchmark
We conducted a test: reading data from parquet files, and dedupping all the data. We tested two methods:
It is observed that this Milestone 1 checkpoint exhibits greater stability compared to anti-join-based deduplication, and can support larger-scale datasets without triggering OOM. This advantage stems from the fact that actor-based checkpointing eliminates the need for costly data shuffling operations.
Note about the above dedup benchmark:
Beta Was this translation helpful? Give feedback.
All reactions