Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4632

Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0.0, 0.10.0.1, 0.10.1.0
    • Fix Version/s: 0.10.0.1, 0.10.1.0
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown from commitSync.

      org.apache.kafka.common.errors.WakeupException
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup (ConsumerNetworkClient.java:404)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:499)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
      at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
      at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
      at java.util.concurrent.FutureTask.run (FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
      at java.lang.Thread.run (Thread.java:745)
      

      I believe it should catch it and ignore it as that is what the poll method does when isStopping is true

              } catch (WakeupException we) {
                  log.trace("{} consumer woken up", id);
      
                  if (isStopping())
                      return;
      
                  if (shouldPause()) {
                      pauseAll();
                  } else if (!pausedForRedelivery) {
                      resumeAll();
                  }
              }
      

      But unsure, love some insight into this.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ScottReynolds Scott Reynolds
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: