Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8409

Race condition in KafkaConsumerThread leads to potential NPE

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.4.0, 1.3.2, 1.5.0
    • Fix Version/s: 1.5.0, 1.4.1, 1.3.4
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      The following lines in the KafkaConsumerThread::setOffsetsToCommit(...) suggests a race condition with the asynchronous callback from committing offsets to Kafka:

      // record the work to be committed by the main consumer thread and make sure the consumer notices that
      if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
          log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
              "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
              "This does not compromise Flink's checkpoint integrity.");
      }
      this.offsetCommitCallback = commitCallback;
      

      In the main consumer thread's main loop, nextOffsetsToCommit will be checked if there are any offsets to commit. If so, an asynchronous offset commit operation will be performed. The NPE happens in the case when the commit completes, but this.offsetCommitCallback = commitCallback; is not yet reached.

      A possible fix is to make setting the next offsets to commit along with the callback instance a single atomic operation.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tzulitai Tzu-Li (Gordon) Tai
                Reporter:
                tzulitai Tzu-Li (Gordon) Tai
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: