Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23896

The new KafkaSink drops data if job fails between checkpoint and transaction commit.

    XMLWordPrintableJSON

Details

    Description

      • Any time a new transactional producer is started, "KafkaProducer#initTransactions()" needs to be called in order to obtain new ProducerId from TransactionCoordinator (Kafka Broker component).
        • ProducerId is increased every time a new producer with the same TransactionalId is registered.
        • Publication of new ProducerId FENCES all prior ProducerIds and ABORTS all of uncommitted transactions assigned with them.
      • KafkaCommitter uses a separate producer, that hacks into Kafka internals and resumes transaction using epoch and producer, without actually assigning a new ProducerId for itself. This committer uses ProducerId that is stored in KafkaComittable state to commit transaction.
      • If a new producer is started before committing the transaction (this can happen in some failover scenarios), ProducerId stored in the state is already FENCED and commit fails with ProducerFencedException. Because we currently ignore this exception (we just log a warning), we effectively DROP all uncommitted data from the previous checkpoint.

      Basically any job failure that happens between successfully taking a checkpoint and committing transactions, will trigger this behavior.

      Attachments

        Issue Links

          Activity

            People

              arvid Arvid Heise
              dmvk David Morávek
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: