Uploaded image for project: 'Mahout'
  1. Mahout
  2. MAHOUT-1809

Failing tests in Flink-bindings: dals and dspca

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.11.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      dspca and dals are failing in the flink distributed decomposition suite with numerical and oom errors respectively:

      dspca Failure and stack trace:

      54.69239412917543 was not less than 1.0E-5
      ScalaTestFailureLocation: org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4 at (FailingTestsSuite.scala:230)
      org.scalatest.exceptions.TestFailedException: 54.69239412917543 was not less than 1.0E-5
      	at org.scalatest.MatchersHelper$.newTestFailedException(MatchersHelper.scala:160)
      	at org.scalatest.Matchers$ShouldMethodHelper$.shouldMatcher(Matchers.scala:6231)
      	at org.scalatest.Matchers$AnyShouldWrapper.should(Matchers.scala:6265)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply$mcV$sp(FailingTestsSuite.scala:230)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply(FailingTestsSuite.scala:186)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply(FailingTestsSuite.scala:186)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      	at org.scalatest.Transformer.apply(Transformer.scala:22)
      	at org.scalatest.Transformer.apply(Transformer.scala:20)
      	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
      	at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
      	at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
      	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FailingTestsSuite.scala:48)
      	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite.runTest(FailingTestsSuite.scala:48)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      	at scala.collection.immutable.List.foreach(List.scala:318)
      	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
      	at org.scalatest.Suite$class.run(Suite.scala:1424)
      	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite.org$scalatest$BeforeAndAfterAllConfigMap$$super$run(FailingTestsSuite.scala:48)
      	at org.scalatest.BeforeAndAfterAllConfigMap$class.liftedTree1$1(BeforeAndAfterAllConfigMap.scala:248)
      	at org.scalatest.BeforeAndAfterAllConfigMap$class.run(BeforeAndAfterAllConfigMap.scala:247)
      	at org.apache.mahout.flinkbindings.FailingTestsSuite.run(FailingTestsSuite.scala:48)
      	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
      	at scala.collection.immutable.List.foreach(List.scala:318)
      	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
      	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.run(Runner.scala:883)
      	at org.scalatest.tools.Runner.run(Runner.scala)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
      

      dals Failure:

      An exception or error caused a run to abort: Java heap space 
      java.lang.OutOfMemoryError: Java heap space
      	at java.util.Arrays.copyOf(Arrays.java:2271)
      	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
      	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
      	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
      	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1893)
      	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
      	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
      	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
      	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
      	at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
      	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:893)
      	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
      	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
      	at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
      	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
      

        Issue Links

          Activity

          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          DSPCA failing in this issue is related to relates to MAHOUT-1810. The reason for the test failure is that some calclations are happening before the checkpoint is reached.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - DSPCA failing in this issue is related to relates to MAHOUT-1810 . The reason for the test failure is that some calclations are happening before the checkpoint is reached.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment - - edited
          test("dspca") {
          
              val rnd = RandomUtils.getRandom
          
              // Number of points
              val m = 500
              // Length of actual spectrum
              val spectrumLen = 40
          
              val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
              printf("spectrum:%s\n", spectrum)
          
              val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
                ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
          
              // PCA Rotation matrix -- should also be orthonormal.
              val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
          
              val input = (u %*%: diagv(spectrum)) %*% tr.t
              val drmInput = drmParallelize(m = input, numPartitions = 2)
          
              // Calculate just first 10 principal factors and reduce dimensionality.
              // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
              // ensure to zero stochastic error and assert only functional correctness of the method's pca-
              // specific additions.
              val k = 10
          
              // Calculate just first 10 principal factors and reduce dimensionality.
              var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
              // Un-normalized pca data:
              drmPCA = drmPCA %*% diagv(s)
          
              val pca = drmPCA.checkpoint(CacheHint.NONE).collect
          
              // Of course, once we calculated the pca, the spectrum is going to be different since our originally
              // generated input was not centered. So here, we'd just brute-solve pca to verify
              val xi = input.colMeans()
              for (r <- 0 until input.nrow) input(r, ::) -= xi
              var (pcaControl, _, sControl) = svd(m = input)
              pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
          
              printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
              printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
          
              (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
          
            }
          
          

          This is the base test which is currently passing on Spark and H20 and failing in Flink.

          By checkpointing drmInput immediately upon parallelization, the test will pass:

           
              val drmInput = drmParallelize(m = input, numPartitions = 2).checkpoint()
          
          Show
          Andrew_Palumbo Andrew Palumbo added a comment - - edited test( "dspca" ) { val rnd = RandomUtils.getRandom // Number of points val m = 500 // Length of actual spectrum val spectrumLen = 40 val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) printf( "spectrum:%s\n" , spectrum) val (u, _) = qr( new SparseRowMatrix(m, spectrumLen) := ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) // PCA Rotation matrix -- should also be orthonormal. val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) val input = (u %*%: diagv(spectrum)) %*% tr.t val drmInput = drmParallelize(m = input, numPartitions = 2) // Calculate just first 10 principal factors and reduce dimensionality. // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to // ensure to zero stochastic error and assert only functional correctness of the method's pca- // specific additions. val k = 10 // Calculate just first 10 principal factors and reduce dimensionality. var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) // Un-normalized pca data: drmPCA = drmPCA %*% diagv(s) val pca = drmPCA.checkpoint(CacheHint.NONE).collect // Of course, once we calculated the pca, the spectrum is going to be different since our originally // generated input was not centered. So here, we'd just brute-solve pca to verify val xi = input.colMeans() for (r <- 0 until input.nrow) input(r, ::) -= xi var (pcaControl, _, sControl) = svd(m = input) pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) printf( "pca:\n%s\n" , pca(0 until 10, 0 until 10)) printf( "pcaControl:\n%s\n" , pcaControl(0 until 10, 0 until 10)) (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 } This is the base test which is currently passing on Spark and H20 and failing in Flink. By checkpointing drmInput immediately upon parallelization, the test will pass: val drmInput = drmParallelize(m = input, numPartitions = 2).checkpoint()
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          dspca is fixed by MAHOUT-1810 though needs more memory in Maven. Opening new issue for dals, which, while numerically stable, fails with OOM. on a matrix of the current base size.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - dspca is fixed by MAHOUT-1810 though needs more memory in Maven. Opening new issue for dals, which, while numerically stable, fails with OOM. on a matrix of the current base size.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user andrewpalumbo opened a pull request:

          https://github.com/apache/mahout/pull/200

          MAHOUT-1809: Bump JVM memory up to 4g for flink scalatests

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/andrewpalumbo/mahout flink-binding

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/mahout/pull/200.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #200


          commit 212452721ec593ae0d2837f8d311b952a3daaafa
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-22T04:08:19Z

          MAHOUT-1809: Bump JVM memory up to 4g for flink scalatests


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user andrewpalumbo opened a pull request: https://github.com/apache/mahout/pull/200 MAHOUT-1809 : Bump JVM memory up to 4g for flink scalatests You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrewpalumbo/mahout flink-binding Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #200 commit 212452721ec593ae0d2837f8d311b952a3daaafa Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-22T04:08:19Z MAHOUT-1809 : Bump JVM memory up to 4g for flink scalatests
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/200#issuecomment-199630367

          This will allow `dspca` to pass in maven tests

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/200#issuecomment-199630367 This will allow `dspca` to pass in maven tests
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/200#issuecomment-199630459

          Though 4g might be too much-- we may want to lower it before releasing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/200#issuecomment-199630459 Though 4g might be too much-- we may want to lower it before releasing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/mahout/pull/200

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/mahout/pull/200
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          bulk-closing resolved issues

          Show
          dlyubimov Dmitriy Lyubimov added a comment - bulk-closing resolved issues
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/)
          MAHOUT-1809: Bump JVM memory up to 4g for flink scalatests closes (apalumbo: rev e06fb119dc3a719651fd48f0d2aa3ee8adee9a1b)

          • flink/pom.xml
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/ ) MAHOUT-1809 : Bump JVM memory up to 4g for flink scalatests closes (apalumbo: rev e06fb119dc3a719651fd48f0d2aa3ee8adee9a1b) flink/pom.xml

            People

            • Assignee:
              Andrew_Palumbo Andrew Palumbo
              Reporter:
              Andrew_Palumbo Andrew Palumbo
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development

                  Agile