Details
-
Bug
-
Status: Patch Available
-
Minor
-
Resolution: Unresolved
-
3.9.0
-
None
-
None
Description
The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail with the following patch:
``` diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 5d2db3c279..b87a82b85b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager { } private <T> T returnWithTasksLocked(final Supplier<T> action) { + try { + Thread.sleep(1000); + } catch (final Exception e) { + } tasksLock.lock(); try { return action.get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java index 98065eae7d..0d8dde7156 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java @@ -114,6 +114,10 @@ public class DefaultTaskManagerTest { @Override public void run() { while (!shutdownRequested.get()) { + try { + Thread.sleep(1000); + } catch (final Exception e) { + } try { taskManager.awaitProcessableTasks(); } catch (final InterruptedException ignored) { @@ -151,6 +155,8 @@ public class DefaultTaskManagerTest { assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS)); awaitingRunnable.shutdown(); + Thread.sleep(5000); + assertFalse(awaitingThread.isAlive()); } @Test ```
awatingThread is left unclosed because it was waiting for the signal
"Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s tid=0x00000001250d8600 nid=26371 waiting on condition [0x0000000173d4e000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method) - parking to wait for <0x00000007dcd49b88> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(java.base@21.0.2/LockSupport.java:371) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.2/AbstractQueuedSynchronizer.java:519) at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.2/ForkJoinPool.java:3780) at java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.2/ForkJoinPool.java:3725) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.2/AbstractQueuedSynchronizer.java:1707) at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142) at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown Source) at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357) at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129) at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122) at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596) at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)
Attachments
Issue Links
- links to