Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
Description
- Any time a new transactional producer is started, "KafkaProducer#initTransactions()" needs to be called in order to obtain new ProducerId from TransactionCoordinator (Kafka Broker component).
- ProducerId is increased every time a new producer with the same TransactionalId is registered.
- Publication of new ProducerId FENCES all prior ProducerIds and ABORTS all of uncommitted transactions assigned with them.
- KafkaCommitter uses a separate producer, that hacks into Kafka internals and resumes transaction using epoch and producer, without actually assigning a new ProducerId for itself. This committer uses ProducerId that is stored in KafkaComittable state to commit transaction.
- If a new producer is started before committing the transaction (this can happen in some failover scenarios), ProducerId stored in the state is already FENCED and commit fails with ProducerFencedException. Because we currently ignore this exception (we just log a warning), we effectively DROP all uncommitted data from the previous checkpoint.
Basically any job failure that happens between successfully taking a checkpoint and committing transactions, will trigger this behavior.
Attachments
Issue Links
- is related to
-
FLINK-23814 Test FLIP-143 KafkaSink
- Closed
-
FLINK-23839 Unclear severity of Kafka transaction recommit warning in logs
- Closed
- relates to
-
FLINK-23854 KafkaSink error when restart from the checkpoint with a lower parallelism by exactly-once guarantee
- Resolved
- links to