Description
CheckpointWriter.stop() is prone to a race condition where the writer thread becomes blocked by the stop caller when trying to access the fs:
"pool-31-thread-1" #156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302) - waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter) at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465) at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291) - locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter) at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159) - locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator) at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115) - locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler) at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679) - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644) - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext) at org.apache.spark.streaming.MapWithStateSuite.org$apache$spark$streaming$MapWithStateSuite$$getOperationOutput(MapWithStateSuite.scala:570) at org.apache.spark.streaming.MapWithStateSuite.org$apache$spark$streaming$MapWithStateSuite$$testOperation(MapWithStateSuite.scala:539) at org.apache.spark.streaming.MapWithStateSuite$$anonfun$18.apply$mcV$sp(MapWithStateSuite.scala:407) at org.apache.spark.streaming.MapWithStateSuite$$anonfun$18.apply(MapWithStateSuite.scala:359) at org.apache.spark.streaming.MapWithStateSuite$$anonfun$18.apply(MapWithStateSuite.scala:359) [...]
This leads to test flakiness (SPARK-13693) and significantly slows down tests (it makes MapWithStateSuite 10 times slower). We should fix this by refactoring the code to remove unnecessary synchronization.
Attachments
Issue Links
- relates to
-
SPARK-13693 Flaky test: o.a.s.streaming.MapWithStateSuite
- Resolved
- links to