Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5896

Kafka Connect task threads never interrupted

    Details

    • Type: Bug
    • Status: Patch Available
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      Problem

      Kafka Connect tasks associated with connectors are run in their own threads. When tasks are stopped or restarted, a flag is set - stopping - to indicate the task should stop processing records. However, if the thread the task is running in is blocked (waiting for a lock or performing I/O) it's possible the task will never stop.

      I've created a connector specifically to demonstrate this issue (along with some more detailed instructions for reproducing the issue): https://github.com/smarter-travel-media/hang-connector

      I believe this is an issue because it means that a single badly behaved connector (any connector that does I/O without timeouts) can cause the Kafka Connect worker to get into a state where the only solution is to restart the JVM.

      I think, but couldn't reproduce, that this is the cause of this problem on Stack Overflow: https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work

      Expected Result

      I would expect the Worker to eventually interrupt the thread that the task is running in. In the past across various other libraries, this is what I've seen done when a thread needs to be forcibly stopped.

      Actual Result

      In actuality, the Worker sets a stopping flag and lets the thread run indefinitely. It uses a timeout while waiting for the task to stop but after this timeout has expired it simply sets a cancelled flag. This means that every time a task is restarted, a new thread running the task will be created. Thus a task may end up with multiple instances all running in their own threads when there's only supposed to be a single thread.

      Steps to Reproduce

      The problem can be replicated by using the connector available here: https://github.com/smarter-travel-media/hang-connector

      Apologies for how involved the steps are.

      I've created a patch that forcibly interrupts threads after they fail to gracefully shutdown here: https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5

      I've confirmed that this fixes the issue. I can add some unit tests and submit a PR if people agree that this is a bug and interrupting threads is the right fix.

      Thanks!

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                56quarters Nick Pillitteri
                Reporter:
                56quarters Nick Pillitteri
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated: