Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4632

Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.0.0, 0.10.0.1, 0.10.1.0
    • 0.10.0.1, 0.10.1.0
    • connect
    • None

    Description

      WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown from commitSync.

      org.apache.kafka.common.errors.WakeupException
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup (ConsumerNetworkClient.java:404)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:499)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
      at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
      at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
      at java.util.concurrent.FutureTask.run (FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
      at java.lang.Thread.run (Thread.java:745)
      

      I believe it should catch it and ignore it as that is what the poll method does when isStopping is true

              } catch (WakeupException we) {
                  log.trace("{} consumer woken up", id);
      
                  if (isStopping())
                      return;
      
                  if (shouldPause()) {
                      pauseAll();
                  } else if (!pausedForRedelivery) {
                      resumeAll();
                  }
              }
      

      But unsure, love some insight into this.

      Attachments

        Activity

          People

            Unassigned Unassigned
            ScottReynolds Scott Reynolds
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: