Skip to content

Conversation

@Stefanietry
Copy link

Purpose

Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table

Linked issue

#6313

Tests

see: org.apache.paimon.spark.SparkCatalogWithHiveTest#testChainTable

API and Format

Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table

Documentation

Related to PIP: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table

@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch 4 times, most recently from 7efbc2f to 817cf2a Compare October 16, 2025 04:30
/** Utils for table. */
public class ChainTableUtils {

public static void checkChainTableOptions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this in SchemaValidation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaValidation

Done

@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch from 817cf2a to 84ee5b3 Compare October 21, 2025 08:40
public static void validateChainTableOptions(
Map<String, String> tableOptions, @Nullable String primaryKeys, boolean partitionTbl) {
if (Boolean.parseBoolean(tableOptions.get(CoreOptions.CHAIN_TABLE_ENABLED.key()))) {
Options options = Options.fromMap(tableOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check that the changelog-producer != ("lookup", "full-compaction")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public static void validateChainTableOptions(
Map<String, String> tableOptions, @Nullable String primaryKeys, boolean partitionTbl) {
if (Boolean.parseBoolean(tableOptions.get(CoreOptions.CHAIN_TABLE_ENABLED.key()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to options.get(CoreOptions.CHAIN_TABLE_ENABLED)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.withDescription("Specify chain table enable.");

public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
key("scan.fallback-snapshot-branch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it doesn't have the default value, we have to check this when the user enable the chain table

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config depends on the actual created branch and operates in sequence with the table creation, so it is designed to be aligned with scan.fallback-branch

this.dvFactory = dvFactory;
}

public KeyValueFileReaderFactory(KeyValueFileReaderFactory factory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we introduce a copy method to replace this construct?

Copy link
Author

@Stefanietry Stefanietry Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for inheritance scenarios for the member variable initialization, and the member is declared as final


@Nullable private final RecordLevelExpire recordLevelExpire;

private final boolean isChainTbl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to forceKeepDelete?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Aitozi
Copy link
Contributor

Aitozi commented Oct 22, 2025

Looks good to me +1. Please take another look cc @JingsongLi

@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch 5 times, most recently from e9c9096 to 90ac914 Compare October 22, 2025 13:15
@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch 2 times, most recently from b9000a9 to 49c13e0 Compare October 29, 2025 12:36
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation.

return result.substring(0, Math.min(result.length(), maxLength));
}

public List<String> generateOrderPartValues(InternalRow in) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this? What is difference? You can add a test for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hoped to obtain the partition values in the order of the partition fields, and then calculate the dependent partitions according to the unit step size. For example, taking the hourly level as an example, the unit step size of the partition is hours. If starting from 0:00 on 20250811 and ending at 0:00 on 20250812, 24 incremental partitions in hours can be obtained, for the specific usage, please refer to org.apache.paimon.partition.InternalRowPartitionUtils#getDeltaPartitions. Unit tests have been added, please refer to org.apache.paimon.utils.InternalRowPartitionComputerTest#testGenerateOrderPartValues

return newPathFromName(newFileName(prefix));
}

public Path newPath(String prefix, Path relativeBucketPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For chain table, one logical partition may correspond to multiple physical partitions. Therefore, when creating a new path, on the basis of the original api about newPath, both partitions and buckets need to be specified simultaneously. The current interface is not in use for now. It is only for compatibility with the function of creating new path

@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch from 49c13e0 to 91e9697 Compare October 31, 2025 09:14
Co-authored-by: zhoufa <zhou1172026225@gmail.com>
@Stefanietry Stefanietry force-pushed the support_chain_table_on_batch branch from 91e9697 to 3aa1526 Compare October 31, 2025 09:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants