Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23509

FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE during transaction recovery (fails)

    XMLWordPrintableJSON

Details

    Description

      When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE here:

      FlinkKafkaInternalProducer#resumeTransaction

      and consequently TransactionManager initializes transactions as new transactions instead of recovered ones. Here:

      TransactionManager#initializeTransactions

      TransactionManager log (edited for readability):

      {{Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true since transactional.id is specified.
      Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer.
      Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
      Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since idempotence is enabled.
      Sink: trxRollupKafkaSink (1/1)#3 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING
      Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in order to acquire a producer ID
      Sink: trxRollupKafkaSink (1/1)#3 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for sending
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) dequeued for sending
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for sending
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, producerId=1, producerEpoch=-1) for request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17)
      [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state INITIALIZING to error state FATAL_ERROR
      Sink: trxRollupKafkaSink (1/1)#3 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0 ms.
      org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
      at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
      at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
      at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
      at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
      at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
      at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
      at java.lang.Thread.run(Thread.java:748)
      }}

       Notice here "Invoking InitProducerId for the first time in order to acquire a producer ID" indicates a request for a new transaction (-1, -1) but below we see instead: "Enqueuing transactional request InitProducerIdRequestData(transactionalId='Sink: ...', ..., producerId=1545118, producerEpoch=17)" because of changed ProducerIdAndEpoch#NONE

       

      TransactionManager#initializeTransactions variables:

       

      Notice the values above which should be -1, -1.

      Stack trace of TransactionManager#initializeTransactions:

      initializeTransactions:314, TransactionManager (org.apache.kafka.clients.producer.internals)
      initializeTransactions:310, TransactionManager (org.apache.kafka.clients.producer.internals)
      initTransactions:591, KafkaProducer (org.apache.kafka.clients.producer)
      initTransactions:88, FlinkKafkaInternalProducer (org.apache.flink.streaming.connectors.kafka.internals)
      recoverAndAbort:1060, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      recoverAndAbort:99, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      initializeState:371, TwoPhaseCommitSinkFunction (org.apache.flink.streaming.api.functions.sink)
      initializeState:1195, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      tryRestoreFunction:189, StreamingFunctionUtils (org.apache.flink.streaming.util.functions)
      restoreFunctionState:171, StreamingFunctionUtils (org.apache.flink.streaming.util.functions)
      initializeState:96, AbstractUdfStreamOperator (org.apache.flink.streaming.api.operators)
      initializeOperatorState:118, StreamOperatorStateHandler (org.apache.flink.streaming.api.operators)
      initializeState:290, AbstractStreamOperator (org.apache.flink.streaming.api.operators)
      initializeStateAndOpenOperators:436, OperatorChain (org.apache.flink.streaming.runtime.tasks)
      restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
      call:-1, 412600778 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
      call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
      restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
      doRun:756, Task (org.apache.flink.runtime.taskmanager)
      run:563, Task (org.apache.flink.runtime.taskmanager)
      run:748, Thread (java.lang)

       

      Stack trace of FlinkKafkaInternalProducer#resumeTransaction when the values are overridden:

      resumeTransaction:204, FlinkKafkaInternalProducer (org.apache.flink.streaming.connectors.kafka.internals)
      resumeTransaction:196, KafkaProducerJobSink$$anon$1$$anon$2 (ch.viseca.flink.connectors.kafka.sinks)
      recoverAndCommit:1029, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      recoverAndCommit:99, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      recoverAndCommitInternal:414, TwoPhaseCommitSinkFunction (org.apache.flink.streaming.api.functions.sink)
      initializeState:364, TwoPhaseCommitSinkFunction (org.apache.flink.streaming.api.functions.sink)
      initializeState:1195, FlinkKafkaProducer (org.apache.flink.streaming.connectors.kafka)
      tryRestoreFunction:189, StreamingFunctionUtils (org.apache.flink.streaming.util.functions)
      restoreFunctionState:171, StreamingFunctionUtils (org.apache.flink.streaming.util.functions)
      initializeState:96, AbstractUdfStreamOperator (org.apache.flink.streaming.api.operators)
      initializeOperatorState:118, StreamOperatorStateHandler (org.apache.flink.streaming.api.operators)
      initializeState:290, AbstractStreamOperator (org.apache.flink.streaming.api.operators)
      initializeStateAndOpenOperators:436, OperatorChain (org.apache.flink.streaming.runtime.tasks)
      restoreGates:574, StreamTask (org.apache.flink.streaming.runtime.tasks)
      call:-1, 412600778 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$745)
      call:55, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
      restore:554, StreamTask (org.apache.flink.streaming.runtime.tasks)
      doRun:756, Task (org.apache.flink.runtime.taskmanager)
      run:563, Task (org.apache.flink.runtime.taskmanager)
      run:748, Thread (java.lang)

       

      Background:

      • we recently upgraded from Flink 1.8.0 to 1.13.0
      • FlinkKafkaInternalProducer#resumeTransaction has not changed between these versions, however
      • in Flink 1.8.0 we never observed any resumable transaction as part of a checkpoint
      • we could not determine what actually made the change that causes the failure, however:
      • it would probably be much saver to instead of assigning new values to an arbitrary ProducerIdAndEpoch held by TransactionManager to directly assign a fresh ProducerIdAndEpoch and thus avoid overriding ProducerIdAndEpoch#NONE

      Sample workaround (scala):

      val sink = new FlinkKafkaProducer[T](val sink = new FlinkKafkaProducer[T](  defaultTopic,  schema,  getProperties,  getSemantic,  getProducerPoolSize)

      Unknown macro: {   override protected def createProducer}

            override def resumeTransaction(producerId: Long, epoch: Short): Unit =
      Unknown macro: {        val transactionManager = FlinkKafkaInternalProducer.getField(kafkaProducer, "transactionManager")        transactionManager.synchronized Unknown macro}
              super.resumeTransaction(producerId, epoch)      }    }  }}

       

      Attachments

        1. 2021-07-26_16-47-48.png
          15 kB
          Matthias Schwalbe

        Issue Links

          Activity

            People

              fpaul Fabian Paul
              Matthias Schwalbe Matthias Schwalbe
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: