Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17162

DefaultTaskManagerTest may leak AwaitingRunnable thread

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Patch Available
    • Minor
    • Resolution: Unresolved
    • 3.9.0
    • None
    • streams, unit tests
    • None

    Description

      The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail with the following patch:

      ```
      diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
      index 5d2db3c279..b87a82b85b 100644
      --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
      +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
      @@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager {
           }
       
           private <T> T returnWithTasksLocked(final Supplier<T> action) {
      +        try {
      +            Thread.sleep(1000);
      +        } catch (final Exception e) {
      +        }
               tasksLock.lock();
               try {
                   return action.get();
      diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
      index 98065eae7d..0d8dde7156 100644
      --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
      +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
      @@ -114,6 +114,10 @@ public class DefaultTaskManagerTest {
               @Override
               public void run() {
                   while (!shutdownRequested.get()) {
      +                try {
      +                    Thread.sleep(1000);
      +                } catch (final Exception e) {
      +                }
                       try {
                           taskManager.awaitProcessableTasks();
                       } catch (final InterruptedException ignored) {
      @@ -151,6 +155,8 @@ public class DefaultTaskManagerTest {
               assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
       
               awaitingRunnable.shutdown();
      +        Thread.sleep(5000);
      +        assertFalse(awaitingThread.isAlive());
           }
       
           @Test
      ```
      

      awatingThread is left unclosed because it was waiting for the signal

      "Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s tid=0x00000001250d8600 nid=26371 waiting on condition  [0x0000000173d4e000]
         java.lang.Thread.State: WAITING (parking)
              at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
              - parking to wait for  <0x00000007dcd49b88> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
              at java.util.concurrent.locks.LockSupport.park(java.base@21.0.2/LockSupport.java:371)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.2/AbstractQueuedSynchronizer.java:519)
              at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.2/ForkJoinPool.java:3780)
              at java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.2/ForkJoinPool.java:3725)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.2/AbstractQueuedSynchronizer.java:1707)
              at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
              at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown Source)
              at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357)
              at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
              at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122)
              at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596)
              at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)
      

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              aoli-al Ao Li
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: