Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6710

Streams integration tests hang during shutdown



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.0
    • 2.0.0
    • core, streams
    • None


      Builds have been timing out a lot recently and many of the logs show streams integration tests being run, but not completed. While running tests locally, I saw a failure during shutdown of TableTableJoinIntegrationTest. The test was stuck waiting for a broker to shutdown when a KafkaScheduler was attemping to delete logs. KAFKA-6624 (Commit #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a Thread.sleep to wait for the time to delete each log segment inside the scheduled delete task. The failing streams test had 62 logs to delete and since MockTime doesn't get updated during the test, it would have waited for 62 minutes to complete. This blocks shutdown of the broker for 62 minutes. This is an issue if a streams integration test takes more than 30 seconds when the first delayed delete task is scheduled to be run.

      Changing Thread.sleep to time.sleep fixes this test issue. But it will be good to know why we have a sleep on a Scheduler at all. With the default log.segment.delete.delay.ms of one minute, this potentially blocks a scheduler thread for upto a minute when there are logs to be deleted. Couldn't we just break out of the loop if it is not yet time to delete the first log segment in the list? The log would then get deleted when the broker checks next time. junrao lindong ?


      Stack trace from failing test:

      "kafka-scheduler-8" daemon prio=5 tid=0x00007fe58dc16800 nid=0x9603 waiting on condition [0x0000700003f25000]
         java.lang.Thread.State: TIMED_WAITING (sleeping)
              at java.lang.Thread.sleep(Native Method)
              at kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)
              at kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)
              at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
              at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745){{}}

      }}{{"Test worker" prio=5 tid=0x00007fe58db72000 nid=0x5203 waiting on condition [0x0000700001cbd000]
         java.lang.Thread.State: TIMED_WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000780fb8918> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
              at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
              at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
              at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)
              at kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)
              at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
              at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)
              at org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)
              at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)
              at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)
              at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
              at org.junit.rules.RunRules.evaluate(RunRules.java:20)


        Issue Links



              rsivaram Rajini Sivaram
              rsivaram Rajini Sivaram
              Jun Rao Jun Rao
              0 Vote for this issue
              4 Start watching this issue

