Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16419

Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

    XMLWordPrintableJSON

Details

    Description

      When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer tries to recommit all pre-committed transactions which are in the snapshot, even if those transactions were successfully committed before (i.e., the call to kafkaProducer.commitTransaction() via notifyCheckpointComplete() returns OK). This may lead to recovery failures when recovering from a very old snapshot because the transactional IDs in that snapshot may have been expired and removed from Kafka.  For example the following scenario:

      1. Start a Flink job with FlinkKafkaProducer sink with exactly-once
      2. Suspend the Flink job with a savepoint A
      3. Wait for time longer than transactional.id.expiration.ms + transaction.remove.expired.transaction.cleanup.interval.ms
      4. Recover the job with savepoint A.
      5. The recovery will fail with the following error:
      2020-02-26 14:33:25,817 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer  - Attempting to resume transaction Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata                            - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
      2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer              - [Producer clientId=producer-1, transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka producer with timeoutMillis = 92233720
      36854775807 ms.
      2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task                    - Source: Custom Source -> Sink: Unnamed (1/1) (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
      org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
              at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
              at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
              at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
              at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
              at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
              at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
              at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
              at java.lang.Thread.run(Thread.java:748)
      

      For now, the workaround is to call producer.ignoreFailuresAfterTransactionTimeout(). This is a bit risky, as it may hide real transaction timeout errors. 

      After discussed with becket_qin, pnowojski and aljoscha, a possible way is to let JobManager, after successfully notifies all operators the completion of a snapshot (via notifyCheckpoingComplete), record the success, e.g., write the successful transactional IDs somewhere in the snapshot. Then those transactions need not recommit upon recovery.

      Attachments

        1. tm0-transaction.log
          37 kB
          Yordan Pavlov

        Issue Links

          Activity

            People

              Unassigned Unassigned
              qinjunjerry Jun Qin
              Votes:
              4 Vote for this issue
              Watchers:
              23 Start watching this issue

              Dates

                Created:
                Updated: