Details
Description
WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown from commitSync.
org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup (ConsumerNetworkClient.java:404) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:499) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104) at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245) at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147) at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) at java.util.concurrent.FutureTask.run (FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745)
I believe it should catch it and ignore it as that is what the poll method does when isStopping is true
} catch (WakeupException we) { log.trace("{} consumer woken up", id); if (isStopping()) return; if (shouldPause()) { pauseAll(); } else if (!pausedForRedelivery) { resumeAll(); } }
But unsure, love some insight into this.