Skip to content

Conversation

@kaori-seasons
Copy link

@kaori-seasons kaori-seasons commented Oct 11, 2025

Purpose

see: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table

1. Overview

Chain Table is a new feature in Paimon designed to solve the problem of periodically storing full data in data warehouses. It optimizes storage and computation performance by dividing data into delta branches and snapshot branches.

1.1 Motivation

In data warehouse systems, there is a typical scenario: periodically storing full data (e.g., daily or hourly). However, between consecutive time intervals, most of the data is redundant, with only a small amount of newly changed data. Traditional processing methods have the following issues:

  1. Full Computation: Merge operations involve full computation and shuffle, resulting in poor performance
  2. Full Storage: Storing full data every day, where newly changed data typically accounts for only a small percentage (e.g., 1%)

Chain Table optimizes through the following approaches:

  1. Incremental Computation: Offline ETL jobs only need to consume newly changed data for the day, without full merging
  2. Incremental Storage: Only store newly changed data each day, with periodic asynchronous compaction to build a global chain table

2. Design Solution

2.1 Configuration Options

Key Default Type Description
chain-table.enabled false Boolean Whether to enable chain table
scan.fallback-snapshot-branch (none) String Snapshot branch when fallback to chain read as partition does not exist in the main branch
scan.fallback-delta-branch (none) String Delta branch when fallback to chain as partition does not exist in the main branch

2.2 Solution

Add two new branches on top of the warehouse: delta and snapshot, which describe newly changed data and full data generated by chain compaction respectively.

2.2.1 Table Structure

  1. Snapshot Branch: Stores full data generated by chain compaction or bootstrap
  2. Delta Branch: Stores newly changed data

2.2.2 Write Strategy

Write data to the corresponding branch based on branch configuration, using partition 20250722 as an example:

  1. Incremental Write: Write to t$branch_delta
  2. Full Write (Chain Compaction): Write to t$branch_snapshot

2.2.3 Read Strategy

Full Batch Read

Adopt corresponding strategies based on whether the partition exists in the snapshot branch:

  1. When querying partition 20250722, if the partition exists in t$branch_snapshot, read directly from t$branch_snapshot
  2. When querying partition 20250726, if the partition does not exist in t$branch_snapshot, read the nearest full partition from t$branch_snapshot and all incremental partitions up to the current time from t$branch_delta
Incremental Batch Read

Read incremental partitions directly from t$branch_delta. For example, when querying partition 20250722, read directly from t$branch_delta

Stream Read

Read data directly from t$branch_delta

2.2.4 Chain Compaction

Merge the incremental data of the current cycle with the full data of the previous cycle to generate the full data for the day. For example, the full data for date=20250729 is generated by merging all incremental partitions from 20250723 to 20250729 in t$branch_delta and the full data of 20250722 in t$branch_snapshot.

3. Implementation Plan

3.1 Core Class Design

3.1.1 ChainFileStoreTable

Inherits from FallbackReadFileStoreTable, implementing chain table splitting and reading functionality.

public class ChainFileStoreTable extends FallbackReadFileStoreTable {
    private final FileStoreTable snapshotStoreTable;
    private final FileStoreTable deltaStoreTable;
    
    public ChainFileStoreTable(
            AbstractFileStoreTable snapshotStoreTable, 
            AbstractFileStoreTable deltaStoreTable) {
        super(snapshotStoreTable, deltaStoreTable);
        this.snapshotStoreTable = snapshotStoreTable;
        this.deltaStoreTable = deltaStoreTable;
    }
    
    // Implement chain table specific scan and read logic
    @Override
    public DataTableScan newScan() {
        return new ChainTableBatchScan(snapshotStoreTable.newScan(), deltaStoreTable.newScan());
    }
    
    @Override
    public InnerTableRead newRead() {
        return new ChainTableRead(snapshotStoreTable.newRead(), deltaStoreTable.newRead());
    }
}

3.1.2 ChainTableBatchScan

Implements batch scan logic for chain tables.

public class ChainTableBatchScan implements DataTableScan {
    private final DataTableScan snapshotScan;
    private final DataTableScan deltaScan;
    
    public ChainTableBatchScan(DataTableScan snapshotScan, DataTableScan deltaScan) {
        this.snapshotScan = snapshotScan;
        this.deltaScan = deltaScan;
    }
    
    @Override
    public TableScan.Plan plan() {
        // Implement chain table read strategy
        // 1. First read existing partitions from snapshot branch
        // 2. For non-existent partitions, get the nearest full data from snapshot branch
        // 3. Get incremental data for the corresponding time period from delta branch
        // 4. Merge data and return
        return new ChainTablePlan(snapshotScan.plan(), deltaScan.plan());
    }
}

3.1.3 ChainTableRead

Implements read logic for chain tables.

public class ChainTableRead implements InnerTableRead {
    private final InnerTableRead snapshotRead;
    private final InnerTableRead deltaRead;
    
    public ChainTableRead(InnerTableRead snapshotRead, InnerTableRead deltaRead) {
        this.snapshotRead = snapshotRead;
        this.deltaRead = deltaRead;
    }
    
    @Override
    public RecordReader<InternalRow> createReader(Split split) throws IOException {
        // Create corresponding reader based on split type
        if (split instanceof ChainTableSplit) {
            ChainTableSplit chainSplit = (ChainTableSplit) split;
            // Implement merged reading of chain table data
            return new ChainTableRecordReader(chainSplit, snapshotRead, deltaRead);
        }
        // Default case, call parent logic
        return snapshotRead.createReader(split);
    }
}

3.2 Configuration Implementation

Add chain table related configurations in CoreOptions class:

// Chain Table related configurations
public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
    key("chain-table.enabled")
        .booleanType()
        .defaultValue(false)
        .withDescription("Whether to enable chain table");

public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
    key("scan.fallback-snapshot-branch")
        .stringType()
        .noDefaultValue()
        .withDescription("Snapshot branch when fallback to chain read as partition does not exist in the main branch");

public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
    key("scan.fallback-delta-branch")
        .stringType()
        .noDefaultValue()
        .withDescription("Delta branch when fallback to chain as partition does not exist in the main branch");

3.3 Table Factory Modification

Modify FileStoreTableFactory to support chain table creation:

public class FileStoreTableFactory {
    public static FileStoreTable create(
            FileIO fileIO,
            Path tablePath,
            TableSchema tableSchema,
            Options dynamicOptions,
            CatalogEnvironment catalogEnvironment) {
        
        // Check if chain table is enabled
        CoreOptions coreOptions = new CoreOptions(tableSchema.options());
        if (coreOptions.chainTableEnabled()) {
            // Create chain table instance
            return createChainTable(fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment);
        }
        
        // Original logic
        FileStoreTable table =
                createWithoutFallbackBranch(
                        fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment);

        Options options = new Options(table.options());
        String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
            // ... existing fallback branch logic
        }

        return table;
    }
    
    private static FileStoreTable createChainTable(
            FileIO fileIO,
            Path tablePath,
            TableSchema tableSchema,
            Options dynamicOptions,
            CatalogEnvironment catalogEnvironment) {
        
        CoreOptions coreOptions = new CoreOptions(tableSchema.options());
        String snapshotBranch = coreOptions.scanFallbackSnapshotBranch();
        String deltaBranch = coreOptions.scanFallbackDeltaBranch();
        
        // Create snapshot branch table
        TableSchema snapshotSchema = tableSchema.copy(
            Collections.singletonMap(CoreOptions.BRANCH.key(), snapshotBranch));
        FileStoreTable snapshotTable = createWithoutFallbackBranch(
                fileIO, tablePath, snapshotSchema, dynamicOptions, catalogEnvironment);
        
        // Create delta branch table
        TableSchema deltaSchema = tableSchema.copy(
            Collections.singletonMap(CoreOptions.BRANCH.key(), deltaBranch));
        FileStoreTable deltaTable = createWithoutFallbackBranch(
                fileIO, tablePath, deltaSchema, dynamicOptions, catalogEnvironment);
        
        // Create chain table instance
        return new ChainFileStoreTable(
            (AbstractFileStoreTable) snapshotTable, 
            (AbstractFileStoreTable) deltaTable);
    }
}

4. Usage Examples

4.1 Create Table

CREATE TABLE default.t (
  t1 string COMMENT 't1',
  t2 string COMMENT 't2',
  t3 string COMMENT 't3'
) PARTITIONED BY (date string COMMENT 'date')
TBLPROPERTIES (
  'primary_key' = 'date,t1',
  'bucket' = '2',
  'bucket-key' = 't1',
  'partition.timestamp-pattern' = '$date',
  'partition.timestamp-formatter' = 'yyyyMMdd',
  'chain-table.enabled' = 'true',
  'scan.fallback-snapshot-branch' = 'snapshot',
  'scan.fallback-delta-branch' = 'delta'
);

4.2 Create Branches

CALL sys.create_branch('default.t', 'snapshot');
CALL sys.create_branch('default.t', 'delta');

4.3 Write Data

-- Full write
INSERT INTO/overwrite `default`.`t$branch_snapshot`
SELECT ...

-- Incremental write
INSERT INTO/overwrite `default`.`t$branch_delta`
SELECT ...

4.4 Read Data

-- Full query
SELECT * FROM default.t WHERE date = '${date}'

-- Incremental query
SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date}'

-- Hybrid query
SELECT * FROM default.t WHERE date = '${date}'
UNION ALL
SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date-1}'

Tests

API and Format

Documentation

@kaori-seasons kaori-seasons changed the title 【ISSUE#6313】 Support chain table Oct 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants