Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22328

Failed to send data to Kafka: Producer attempted an operation with an old epoch

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments



      Flink job fails occasionally.Here is the stacktrace:

      org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch.Either there is a newer producer with the same transactionalId, or the producer's transactioin has been expired by the broker.
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:640)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:157)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:81)
          at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
          at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
      Caused by:org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch.Either there is a newer producer with the same transactionalId, or the producer's transactioin has been expired by the broker.

      The job use FlinkKafkaProducer with EXACTLY_ONCE and deploy on Yarn.

      In the debugging information I found the transactionalId is "Source: Custom Source -> (Process -> Sink: errorMessageToKafka, Sink: etlMultiTopicSink) -03f86923ea4164263684d81917202071-0".

      In kafka server.log,the exception:

      ERROR [ReplicaManager borker=1004] Error processing append on partition ods_source-2 (kafka.server.ReplicaManager)
      org.apache.kafka.common.errors.ProducerFencedExceptioin: Producer's epoch is no longer valid.There is probably another producer with a newer epoch. 158 (request epoch), 159 (server:epoch)

      Here is the log that kafka increase epoch with this transactionalId "Source: Custom Source -> (Process -> Sink: errorMessageToKafka, Sink: etlMultiTopicSink) -03f86923ea4164263684d81917202071-0":

      INFO [TransactionCoordinator id=1003] Initialized transactionalId Source: Custom Source -> (Process -> Sink: errorMessageToKafka, Sink: etlMultiTopicSink) -03f86923ea4164263684d81917202071-0 with producerId 21036 and producer epoch 158 on partition _transaction_state-3 (kafka.coordinator.transaction.TransactionCoordinator)
      INFO [TransactionCoordinator id=1003] Initialized transactionalId Source: Custom Source -> (Process -> Sink: errorMessageToKafka, Sink: etlMultiTopicSink) -03f86923ea4164263684d81917202071-0 with producerId 21036 and producer epoch 160 on partition _transaction_state-3 (kafka.coordinator.transaction.TransactionCoordinator)

      There is no info that record kafka set producer epoch to 159.



          This comment will be Viewable by All Users Viewable by All Users


            Unassigned Unassigned
            Feng Sun 孙峰




                Issue deployment