Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14311

Connect Worker clean shutdown does not cleanly stop connectors/tasks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.3.1
    • 3.5.0
    • connect
    • None

    Description

      When the DistributedHerder::stop() method called, it triggers asynchronous shutdown of the background herder thread, and continues with synchronous shutdown of some other resources, including the stopAndStartExecutor.

      This executor is responsible for cleanly stopping connectors and tasks, which it  the DistributedHerder::halt() method. There is a race condition between the halt() method asynchronously submitting these connector/task stop jobs and the stop() method terminating the executor. If the executor is terminated first, this exception appears:

      [2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, groupId=connect-integration-test-connect-cluster-1] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366)
      java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@62878e25[Not completed, task = org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3] rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
          at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
          at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
          at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
          at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
          at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667)
          at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765)
          at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)

      Attachments

        Issue Links

          Activity

            People

              sagarrao Sagar Rao
              gharris1727 Greg Harris
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: