Details
Description
Sometimes when running the `kafka-console-consumer` script inside a pipe and trying to stop it with a `SIGINT` (`ctrl+c`), the process will not stop.
ubuntu@xxx:~$ kafka-console-consumer --zookeeper localhost --topic topic --from-beginning | grep "pattern" record1 ... recordN ^CUnable to write to standard out, closing consumer. ^C # process is still running
When looking at the various threads running on the JVM, I noticed that one user thread is waiting on a latch preventing the JVM from shutting down:
... "Thread-6" prio=10 tid=0x00007f258c016000 nid=0x289f waiting on condition [0x00007f259aee3000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000d6640c80> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101) ... "main" prio=10 tid=0x00007f25c400e000 nid=0x2878 waiting for monitor entry [0x00007f25cbefc000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00000000d6974308> (a java.lang.Class for java.lang.Shutdown) at java.lang.Runtime.exit(Runtime.java:109) at java.lang.System.exit(System.java:962) at kafka.tools.ConsoleConsumer$.checkErr(ConsoleConsumer.scala:149) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:136) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
I believe the standard output linked to the defunct grep process get closed and triggers a `System.exit(1)` that prevents the latch from getting count down and therefore the main thread to hang on forever:
def checkErr(formatter: MessageFormatter) {
if (System.out.checkError())
}
This only happens when `System.out` is checked for errors after consuming a message and before the consumer get closed, definitely a race condition that is most likely to happen when messages are consumed at a high throughput.
Attachments
Attachments
Issue Links
- links to