Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33437

Flink 1.17 sink commited legacy Committable state, but it was not removed from state backend

    XMLWordPrintableJSON

Details

    • Patch

    Description

      My Flink job graph: kafka source -> process -> kafka sink.

      I used savepoint to upgrade Flink 1.14.5 to 1.17.1, and the program ran normally.A month later, I restarted the Flink job using a savepoint, and the job was started normally.Unfortunately, the Flink job failed every time when it did a checkpoint.For example the following scenario:
       

      1. The program uses Kafka sink
      2. Suspend flink job with savepoint A, and Flink Version is 1.14.x
      3. Recover the job with savepoint A, and Flink Version is 1.17.1
      4. Wait for time longer than transactional.id.expiration.ms + transaction.remove.expired.transaction.cleanup.interval.ms
      5. Suspend flink job with savepoint B, and Flink Version is 1.17.1
      6. Recover the job with savepoint B, and Flink Version is 1.17.1
      7. Trigger checkpoint ,the Flink job will fail with the following error:
        java.io.IOException: Could not perform checkpoint 1009710 for operator kafka-sink: Committer (2/2)#2.
            at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
            at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
            at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
            at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
            at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
            at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
            at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
            at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
            at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
            at java.lang.Thread.run(Thread.java:750)
        Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
            at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
            at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
            at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
            at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:245)
            at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:215)
            at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:177)
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
            at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
            ... 22 more
        Caused by: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=47326303, epoch=0, transactionalId=kafka-sink-xxx-1-882965}
            at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
            at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
            at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
            at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:177)
            at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:161)
            at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:200)
            at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
            ... 33 more
        Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
         

      Attachments

        Issue Links

          Activity

            People

              dyccode Yuchi Duan
              dyccode Yuchi Duan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: