Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17624

Flaky test? StateStoreSuite maintenance

    XMLWordPrintableJSON

Details

    • Test
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.1, 2.1.0
    • 2.0.2, 2.1.0
    • Tests
    • None

    Description

      I've noticed this test failing consistently (25x in a row) with a two core machine but not on an eight core machine

      If we increase the spark.rpc.numRetries value used in the test from 1 to 2 (3 being the default in Spark), the test reliably passes, we can also gain reliability by setting the master to be anything other than just local.

      Is there a reason spark.rpc.numRetries is set to be 1?

      I see this failure is also mentioned here so it's been flaky for a while http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-2-0-0-RC5-td18367.html

      If we run without the "quietly" code so we get debug info:

      16:26:15.213 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error sending message [message = VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb,0,0),driver)] in 1 attempts
      org.apache.spark.SparkException: Exception thrown in awaitResult
              at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
              at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
              at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
              at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
              at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
              at scala.Option.map(Option.scala:146)
              at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
              at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.lang.Thread.run(Thread.java:785)
      Caused by: org.apache.spark.SparkException: Could not find StateStoreCoordinator.
              at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
              at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129)
              at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225)
              at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
              ... 19 more
      16:26:15.217 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error managing StateStore[id = (op=0, part=0), dir = /home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb/0/0], stopping management thread
      - maintenance *** FAILED ***
        The code passed to eventually never returned normally. Attempted 636 times over 10.009220005000001 seconds. Last failure message: StateStoreSuite.this.fileExists(provider, 1L, false) was true earliest file not deleted. (StateStoreSuite.scala:396)
      Run completed in 19 seconds, 181 milliseconds.
      Total number of tests run: 9
      Suites: completed 2, aborted 0
      Tests: succeeded 8, failed 1, canceled 0, ignored 0, pending 0
      *** 1 TEST FAILED ***
      

      With local[2] (presumably what .setMaster local resolves to on a two core box?) the test reliably passes and this is also the case for local[1] and local[4], for example:

      StateStoreSuite:
      16:17:34.379 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      - get, put, remove, commit, and all data iterator
      - updates iterator with all combos of updates and removes
      - cancel
      - getStore with unexpected versions
      - snapshotting
      - cleaning
      - corrupted file handling
      - StateStore.get
      In the maint test
      In eventually timeout 10
      Done assert 1
      Now doing assert 2
      Done assert 2
      16:17:38.207 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error sending message [message = VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c,0,0),driver)] in 1 attempts
      org.apache.spark.SparkException: Exception thrown in awaitResult
              at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
              at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
              at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
              at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
              at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
              at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
              at scala.Option.map(Option.scala:146)
              at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
              at org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197)
              at org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.lang.Thread.run(Thread.java:785)
      Caused by: org.apache.spark.SparkException: Could not find StateStoreCoordinator.
              at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
              at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129)
              at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225)
              at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
              at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
              ... 19 more
      16:17:38.209 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error managing StateStore[id = (op=0, part=0), dir = /home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c/0/0], stopping management thread
      - maintenance
      Run completed in 9 seconds, 317 milliseconds.
      Total number of tests run: 9
      Suites: completed 2, aborted 0
      Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0
      All tests passed.
      

      Attachments

        Activity

          People

            tdas Tathagata Das
            aroberts Adam Roberts
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: