Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26960

Reduce flakiness of Spark ML Listener test suite by waiting for listener bus to clear

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • ML, Tests
    • None

    Description

      SPARK-23674 added SparkListeners for some spark.ml events, as well as a test suite. I've observed flakiness in the test suite (though I unfortunately only have local logs for failures, not permalinks). Looking at logs, here's my assessment, which I confirmed with zsxwing.

      Test failure I saw:

      [info] - pipeline read/write events *** FAILED *** (10 seconds, 253 milliseconds)
      [info]   The code passed to eventually never returned normally. Attempted 20 times over 10.012222409 seconds. Last failure message: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage). (MLEventsSuite.scala:190)
      [info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
      [info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
      [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
      [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
      [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
      [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
      [info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
      [info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
      [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
      [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
      [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
      [info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
      [info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
      [info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      [info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
      [info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
      [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      [info]   at scala.collection.immutable.List.foreach(List.scala:392)
      [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      [info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      [info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      [info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      [info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
      [info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      [info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
      [info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
      [info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
      [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
      [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
      [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
      [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
      [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
      [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      [info]   at java.lang.Thread.run(Thread.java:748)
      [info]   Cause: org.scalatest.exceptions.TestFailedException: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage)
      [info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
      [info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
      [info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
      [info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:202)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:191)
      [info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply$mcV$sp(MLEventsSuite.scala:191)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
      [info]   at org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
      [info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
      [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
      [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
      [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
      [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
      [info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
      [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
      [info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
      [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
      [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
      [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
      [info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
      [info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
      [info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      [info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
      [info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
      [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      [info]   at scala.collection.immutable.List.foreach(List.scala:392)
      [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      [info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      [info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      [info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      [info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
      [info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      [info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
      [info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
      [info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
      [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
      [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
      [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
      [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
      [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
      [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      [info]   at java.lang.Thread.run(Thread.java:748)
      

      The issue is in the "read/write events" tests, which work as follows:

      • write
      • wait until we see at least 1 write-related SparkListenerEvent
      • read
      • wait until we see at least 1 read-related SparkListenerEvent

      The problem is that the last step does NOT allow any write-related SparkListenerEvents, but some of those events may be delayed enough that they are seen in this last step. We should ideally add logic before "read" to wait until the listener events are cleared/complete. Looking into other SparkListener tests, we need to use `sc.listenerBus.waitUntilEmpty(TIMEOUT)`.

      Relevant test code: https://github.com/apache/spark/blob/91caf0bfce4706a264fcfe222fa500354ce69ff1/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala#L247

      I'll send a patch for this.

      Attachments

        Issue Links

          Activity

            People

              josephkb Joseph K. Bradley
              josephkb Joseph K. Bradley
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: