Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11654

Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

    XMLWordPrintableJSON

Details

    Description

      We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle.

      Example exception from the Kafka log:

       

      [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager)
      org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch)
      

      The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:

      The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters).

       

       

      --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
      +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
      @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
                      nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
                              NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
                      transactionalIdsGenerator = new TransactionalIdsGenerator(
      + // the prefix probably should include job id and maybe cluster id
                              getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
                              getRuntimeContext().getIndexOfThisSubtask(),
                              getRuntimeContext().getNumberOfParallelSubtasks(),

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jkreileder Jürgen Kreileder
              Votes:
              0 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: