Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.17.1
-
K8s, Flink 1.17.1
-
Patch
Description
My Flink job graph: kafka source -> process -> kafka sink.
I used savepoint to upgrade Flink 1.14.5 to 1.17.1, and the program ran normally.A month later, I restarted the Flink job using a savepoint, and the job was started normally.Unfortunately, the Flink job failed every time when it did a checkpoint.For example the following scenario:
- The program uses Kafka sink
- Suspend flink job with savepoint A, and Flink Version is 1.14.x
- Recover the job with savepoint A, and Flink Version is 1.17.1
- Wait for time longer than transactional.id.expiration.ms + transaction.remove.expired.transaction.cleanup.interval.ms
- Suspend flink job with savepoint B, and Flink Version is 1.17.1
- Recover the job with savepoint B, and Flink Version is 1.17.1
- Trigger checkpoint ,the Flink job will fail with the following error:
java.io.IOException: Could not perform checkpoint 1009710 for operator kafka-sink: Committer (2/2)#2. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:245) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:215) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:177) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244) ... 22 more Caused by: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=47326303, epoch=0, transactionalId=kafka-sink-xxx-1-882965} at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77) at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119) at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126) at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:177) at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:161) at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:200) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94) ... 33 more Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Attachments
Issue Links
- links to