Description
I've noticed this test failing consistently (25x in a row) with a two core machine but not on an eight core machine
If we increase the spark.rpc.numRetries value used in the test from 1 to 2 (3 being the default in Spark), the test reliably passes, we can also gain reliability by setting the master to be anything other than just local.
Is there a reason spark.rpc.numRetries is set to be 1?
I see this failure is also mentioned here so it's been flaky for a while http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-2-0-0-RC5-td18367.html
If we run without the "quietly" code so we get debug info:
16:26:15.213 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error sending message [message = VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb,0,0),driver)] in 1 attempts org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227) at scala.Option.map(Option.scala:146) at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197) at org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.lang.Thread.run(Thread.java:785) Caused by: org.apache.spark.SparkException: Could not find StateStoreCoordinator. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154) at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129) at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) ... 19 more 16:26:15.217 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error managing StateStore[id = (op=0, part=0), dir = /home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb/0/0], stopping management thread - maintenance *** FAILED *** The code passed to eventually never returned normally. Attempted 636 times over 10.009220005000001 seconds. Last failure message: StateStoreSuite.this.fileExists(provider, 1L, false) was true earliest file not deleted. (StateStoreSuite.scala:396) Run completed in 19 seconds, 181 milliseconds. Total number of tests run: 9 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 1, canceled 0, ignored 0, pending 0 *** 1 TEST FAILED ***
With local[2] (presumably what .setMaster local resolves to on a two core box?) the test reliably passes and this is also the case for local[1] and local[4], for example:
StateStoreSuite: 16:17:34.379 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable - get, put, remove, commit, and all data iterator - updates iterator with all combos of updates and removes - cancel - getStore with unexpected versions - snapshotting - cleaning - corrupted file handling - StateStore.get In the maint test In eventually timeout 10 Done assert 1 Now doing assert 2 Done assert 2 16:17:38.207 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error sending message [message = VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c,0,0),driver)] in 1 attempts org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227) at scala.Option.map(Option.scala:146) at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199) at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197) at org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.lang.Thread.run(Thread.java:785) Caused by: org.apache.spark.SparkException: Could not find StateStoreCoordinator. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154) at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129) at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) ... 19 more 16:17:38.209 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error managing StateStore[id = (op=0, part=0), dir = /home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c/0/0], stopping management thread - maintenance Run completed in 9 seconds, 317 milliseconds. Total number of tests run: 9 Suites: completed 2, aborted 0 Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0 All tests passed.
Attachments
Issue Links
- links to