-
-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Bug report
This is a follow up issue to #1610. It turned out that fix of #1610 solved only half of the problem.
What Debezium connector do you use and what version?
Cassandra Connector 3.4.1.Final and latest origin/main
What is the connector configuration?
connector.name=dbz_test_connector
commit.log.relocation.dir=/opt/debezium/relocation
commit.log.real.time.processing.enabled=true
commit.log.error.reprocessing.enabled=true
http.port=8000
cassandra.config=/etc/cassandra/cassandra.yaml
cassandra.hosts=host.docker.internal
cassandra.port=9042
cassandra.driver.config.file=/opt/debezium/conf/cassandra-driver.conf
kafka.producer.bootstrap.servers=kafka:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
kafka.producer.delivery.timeout.ms=120000
kafka.producer.enable.idempotence=false
topic.prefix=dbz_test
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
offset.backing.store.dir=/opt/debezium/offsets
snapshot.consistency=ONE
snapshot.mode=NEVERWhat is the captured database version and mode of deployment?
4.1.9 Cassandra. Local deployment using docker image cassandra:4.1.9
What behavior do you expect?
When commit.log.error.reprocessing.enabled=true and error commit log files are moved back to the CDC directory (via CommitLogTransfer.getErrorCommitLogFiles()), the connector should reprocess all mutations from those files that were never successfully emitted to Kafka.
What behavior do you see?
All mutations in the reprocessed error file are silently skipped with the log message:
Mutation at CommitLog-7-<segment>.log:<position> for table <keyspace>.<table> already processed, skipping...
Root cause: FileOffsetWriter tracks a single per-table offset representing the globally latest processed position. CassandraXCommitLogReadHandlerImpl.handleMutation() compares each mutation's position against this recorded offset using OffsetPosition.compareTo(), which sorts by commit log segment ID extracted from the filename first, then by byte position. Once any newer segment advances the per-table offset past the error file's segment ID, every mutation in that error file satisfies currentOffset <= recordedOffset and is considered already processed — even mutations that were never emitted to Kafka.
Scenario that triggers it:
CommitLog-7-100.logfails mid-processing → moved to error/- Processing continues normally;
CommitLog-7-200.logfinishes → per-table offset is nowCommitLog-7-200.log:<position> CommitLog-7-100.logis moved back to CDC dir for reprocessing- Every mutation in
CommitLog-7-100.logis skipped — data is permanently lost with no warning
In practice this means errorCommitLogReprocessEnabled is only effective in a very narrow window where no newer commit logs have been processed for the affected table since the failure. In any realistic production environment the feature does not work.
Do you see the same behaviour using the latest released Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
No
How to reproduce the issue using our tutorial deployment?
- Configure the connector with
commit.log.error.reprocessing.enabled=trueand a customCommitLogTransferimplementation that moves files betweenerror/andCDCdir - Reproduce all steps from Event loss when Kafka producer fails (e.g. delivery.timeout.ms exceeded) #1610
- Restart connector (that should move files back from
error/toCDCdir) - Error files are processed by changes never reach Kafka
Fix Proposal
Proposed fix:
- Add a
reprocessingCommitLogs: Set<String>to the connector context. CommitLogTransfer— changegetErrorCommitLogFiles()to returnCollection<String>of moved filenamesCommitLogProcessor+CommitLogIdxProcessor— populate the set fromgetErrorCommitLogFiles()return valueCassandra3/4/5CommitLogReadHandlerImpl— bypassisOffsetProcessedcheck when file is in the setQueueProcessor— remove file from set on successful archive
Basically, isOffsetProcessed would check if the file is present in the reprocessingCommitLogs set and, if it is, allow mutations to flow through.
I will gladly add those changes if you agree with them.
Metadata
Metadata
Assignees
Type
Projects
Status