Skip to content

commit.log.error.reprocessing.enabled=true silently skips all mutations from error commit logs once newer segments have been processed #1647

@arturalbov

Description

@arturalbov

Bug report

This is a follow up issue to #1610. It turned out that fix of #1610 solved only half of the problem.

What Debezium connector do you use and what version?

Cassandra Connector 3.4.1.Final and latest origin/main


What is the connector configuration?

connector.name=dbz_test_connector
commit.log.relocation.dir=/opt/debezium/relocation
commit.log.real.time.processing.enabled=true
commit.log.error.reprocessing.enabled=true
http.port=8000
cassandra.config=/etc/cassandra/cassandra.yaml
cassandra.hosts=host.docker.internal
cassandra.port=9042
cassandra.driver.config.file=/opt/debezium/conf/cassandra-driver.conf
kafka.producer.bootstrap.servers=kafka:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
kafka.producer.delivery.timeout.ms=120000
kafka.producer.enable.idempotence=false
topic.prefix=dbz_test
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
offset.backing.store.dir=/opt/debezium/offsets
snapshot.consistency=ONE
snapshot.mode=NEVER

What is the captured database version and mode of deployment?

4.1.9 Cassandra. Local deployment using docker image cassandra:4.1.9


What behavior do you expect?

When commit.log.error.reprocessing.enabled=true and error commit log files are moved back to the CDC directory (via CommitLogTransfer.getErrorCommitLogFiles()), the connector should reprocess all mutations from those files that were never successfully emitted to Kafka.


What behavior do you see?

All mutations in the reprocessed error file are silently skipped with the log message:

Mutation at CommitLog-7-<segment>.log:<position> for table <keyspace>.<table> already processed, skipping...

Root cause: FileOffsetWriter tracks a single per-table offset representing the globally latest processed position. CassandraXCommitLogReadHandlerImpl.handleMutation() compares each mutation's position against this recorded offset using OffsetPosition.compareTo(), which sorts by commit log segment ID extracted from the filename first, then by byte position. Once any newer segment advances the per-table offset past the error file's segment ID, every mutation in that error file satisfies currentOffset <= recordedOffset and is considered already processed — even mutations that were never emitted to Kafka.

Scenario that triggers it:

  1. CommitLog-7-100.log fails mid-processing → moved to error/
  2. Processing continues normally; CommitLog-7-200.log finishes → per-table offset is now CommitLog-7-200.log:<position>
  3. CommitLog-7-100.log is moved back to CDC dir for reprocessing
  4. Every mutation in CommitLog-7-100.log is skipped — data is permanently lost with no warning

In practice this means errorCommitLogReprocessEnabled is only effective in a very narrow window where no newer commit logs have been processed for the affected table since the failure. In any realistic production environment the feature does not work.


Do you see the same behaviour using the latest released Debezium version?

Yes


Do you have the connector logs, ideally from start till finish?

No


How to reproduce the issue using our tutorial deployment?

  1. Configure the connector with commit.log.error.reprocessing.enabled=true and a custom CommitLogTransfer implementation that moves files between error/ and CDC dir
  2. Reproduce all steps from Event loss when Kafka producer fails (e.g. delivery.timeout.ms exceeded) #1610
  3. Restart connector (that should move files back from error/ to CDC dir)
  4. Error files are processed by changes never reach Kafka

Fix Proposal

Proposed fix:

  • Add a reprocessingCommitLogs: Set<String> to the connector context.
  • CommitLogTransfer — change getErrorCommitLogFiles() to return Collection<String> of moved filenames
  • CommitLogProcessor + CommitLogIdxProcessor — populate the set from getErrorCommitLogFiles() return value
  • Cassandra3/4/5CommitLogReadHandlerImpl — bypass isOffsetProcessed check when file is in the set
  • QueueProcessor — remove file from set on successful archive

Basically, isOffsetProcessed would check if the file is present in the reprocessingCommitLogs set and, if it is, allow mutations to flow through.

I will gladly add those changes if you agree with them.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    Ready

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions