Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25919 Sink V2 improvements and followups
  3. FLINK-30068

Allow users to configure what to do with errors while committing transactions during recovery in KafkaSink

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.16.0, 1.17.0, 1.15.2
    • None
    • Connectors / Kafka
    • None

    Description

      Currently it looks like KafkaSink fails the job on any failures to commit transactions. As reported by the user, this makes impossible for jobs to recover from older Savepoints.

      2022-11-16 10:01:07.168 [flink-akka.actor.default-dispatcher-13] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer -> Save to Kafka daily ETH: Committer) (4/5) (6d4d91ab8657bba830695b9a011f7db6) switched from INITIALIZING to RUNNING.
      2022-11-16 10:01:37.222 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 65436 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1668592897201 for job 00000000000000000000000000000000.
      2022-11-16 10:01:39.082 [flink-akka.actor.default-dispatcher-13] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Balances aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5) (cfaca46e7f4dc89629cdcaed5b48c059) switched from RUNNING to FAILED on 10.42.145.181:33297-efc328 @ eth-top-holders-v2-flink-taskmanager-0.eth-top-holders-v2-flink-taskmanager.flink.svc.cluster.local (dataPort=43125).
      java.io.IOException: Could not perform checkpoint 65436 for operator Balances aggreagation ETH -> (Filter -> Map -> Save to Kafka realtime ETH: Writer -> Save to Kafka realtime ETH: Committer, Filter -> Map -> Save to Kafka daily ETH: Writer -> Save to Kafka daily ETH: Committer) (1/5)#0.
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
      	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
      	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
      	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
      	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
      	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
      	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
      	at java.base/java.lang.Thread.run(Unknown Source)
      Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
      	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:234)
      	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:204)
      	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
      	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
      	... 22 common frames omitted
      Caused by: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=6640191, epoch=0, transactionalId=eth_top_holders_daily_v11-0-65435}
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
      	at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
      	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
      	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
      	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:199)
      	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
      	... 35 common frames omitted
      Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pnowojski Piotr Nowojski
              Votes:
              2 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: