Uploaded image for project: 'Mahout'
  1. Mahout
  2. MAHOUT-1810

Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.11.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      the A + B, Identically Partitioned test in the Flink RLikeDrmOpsSuite fails. This test failure likely indicates an issue with Flink's Checkpointing or mapBlock operator:

        test("C = A + B, identically partitioned") {
          val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
          val A = drmParallelize(inCoreA, numPartitions = 2)
      
          // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
          val B = A.mapBlock() {
            case (keys, block) =>
              val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
              keys -> bBlock
          }
            // Prevent repeated computation non-determinism
            // flink problem is here... checkpoint is not doing what it should
            .checkpoint()
      
          val inCoreB = B.collect
      
          printf("A=\n%s\n", inCoreA)
          printf("B=\n%s\n", inCoreB)
      
          val C = A + B
          val inCoreC = C.collect
          printf("C=\n%s\n", inCoreC)
      
          // Actual
          val inCoreCControl = inCoreA + inCoreB
          (inCoreC - inCoreCControl).norm should be < 1E-10
        }
      

      The output shous clearly that the line:

              val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
      

      in the mapBlock closure is being calculated more than once.

      Output:

      A=
      {
       0 =>	{0:1.0,1:2.0,2:3.0}
       1 =>	{0:3.0,1:4.0,2:5.0}
       2 =>	{0:5.0,1:6.0,2:7.0}
      }
      B=
      {
       0 =>	{0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
       1 =>	{0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
       2 =>	{0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
      }
      
      C=
      {
       0 =>	{0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
       1 =>	{0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
       2 =>	{0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
      }
      

        Issue Links

          Activity

          Hide
          Andrew_Palumbo Andrew Palumbo added a comment - - edited

          Flink Checkpointing seems to be working fine on other computational paths (other then MapBlock).

           test("Checkpoint test") {
          
              val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
              val inCoreB = inCoreA.like := { (r, c, v) => util.Random.nextDouble()}
          
              val A = drmParallelize(inCoreA, numPartitions = 2)
              val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
          
              val C = (B %*% A.t)
              val D = (B %*% A.t)
          
          
              val inCoreC = C.collect
              val inCoreD = D.collect
          
              println(inCoreC)
              (inCoreC - inCoreD).norm should be < 1E-10
          
            }
          

          passes.

          Edit note: This does pass however It is not failing for me now when i remove the checkpoint- it may have something to do with the re-keying fix for MAHOUT-1815. Or may have only been failing (without the checkpoint) on another branch, while re-working FlinkOpAtB).

          Bottom line this test does not provide much information and passes with or without the checkpoint at:

              val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
          
          Show
          Andrew_Palumbo Andrew Palumbo added a comment - - edited Flink Checkpointing seems to be working fine on other computational paths (other then MapBlock). test( "Checkpoint test" ) { val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) val inCoreB = inCoreA.like := { (r, c, v) => util.Random.nextDouble()} val A = drmParallelize(inCoreA, numPartitions = 2) val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() val C = (B %*% A.t) val D = (B %*% A.t) val inCoreC = C.collect val inCoreD = D.collect println(inCoreC) (inCoreC - inCoreD).norm should be < 1E-10 } passes. Edit note : This does pass however It is not failing for me now when i remove the checkpoint - it may have something to do with the re-keying fix for MAHOUT-1815 . Or may have only been failing (without the checkpoint ) on another branch, while re-working FlinkOpAtB ). Bottom line this test does not provide much information and passes with or without the checkpoint at: val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          Assumption of identical partitioning depends on the engine. Maybe it doesn't hold in case of flink at all?

          In this case (checkpoint or not) the assumption is that collection.map(x=>x) doesn't change neither data allocation to splits nor its ordering inside every split (aka partition). If this holds, then input and output are identically partitioned.

          Therefore, if B = A.map(x=> x...) then A and B are identically partitioned, and then A + B can be optimized as A.zip(B).map (_._1 + _._2). If A and B are not identically partitioned, then elementwise binary functions would require pre-join, which is much more expensive than zip.

          This test simply provokes this optimization (in spark), but if engine doesn't support zips or assumption of identical partitioning does not hold, then engine optimizer should rectify the situation by always executing join() after mapblocks. Check back with me for more info where to hack it if it is indeed the case..

          Show
          dlyubimov Dmitriy Lyubimov added a comment - Assumption of identical partitioning depends on the engine. Maybe it doesn't hold in case of flink at all? In this case (checkpoint or not) the assumption is that collection.map(x=>x) doesn't change neither data allocation to splits nor its ordering inside every split (aka partition). If this holds, then input and output are identically partitioned. Therefore, if B = A.map(x=> x...) then A and B are identically partitioned, and then A + B can be optimized as A.zip(B).map (_._1 + _._2). If A and B are not identically partitioned, then elementwise binary functions would require pre-join, which is much more expensive than zip. This test simply provokes this optimization (in spark), but if engine doesn't support zips or assumption of identical partitioning does not hold, then engine optimizer should rectify the situation by always executing join() after mapblocks. Check back with me for more info where to hack it if it is indeed the case..
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          Another note is that checkpoint() (at least in case of spark) would not prevent computation non-determinism in case a partition is lost and subsequently recomputed.
          it may have effect on double executions if engine indeed recomputes the input A again as part of A+B so maybe yes checkpoint is not doing what it is supposed to do for Flink, i.e., does not create optimization barrier here or/and does not cache intermediate result by default.

          Show
          dlyubimov Dmitriy Lyubimov added a comment - Another note is that checkpoint() (at least in case of spark) would not prevent computation non-determinism in case a partition is lost and subsequently recomputed. it may have effect on double executions if engine indeed recomputes the input A again as part of A+B so maybe yes checkpoint is not doing what it is supposed to do for Flink, i.e., does not create optimization barrier here or/and does not cache intermediate result by default.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Thanks, I will check back in with you on the mapBlock join fix when I'm more focused on this issue. I have MAHOUT-1815 more in my head right now, but I am suspecting that Flink partitioning has something to do with all of our major bugs.

          I don't believe that the Flink engine supports zips for DataSets. In the case of Flink AewB a join is always done via DataSet.coGroup:

              val rowsA = A.asRowWise.ds
              val rowsB = B.asRowWise.ds
              implicit val kTag = op.keyClassTag
          
              val res: DataSet[(K, Vector)] =
                rowsA
                  .coGroup(rowsB)
                  .where(0)
                  .equalTo(0) {
                  (left, right, out: Collector[(K, Vector)]) =>
                    (left.toIterable.headOption, right.toIterable.headOption) match {
                      case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b)))
                      case (None, Some(b)) => out.collect(b)
                      case (Some(a), None) => out.collect(a)
                      case (None, None) => throw new RuntimeException("At least one side of the co group " +
                        "must be non-empty.")
                    }
                }
          
          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Thanks, I will check back in with you on the mapBlock join fix when I'm more focused on this issue. I have MAHOUT-1815 more in my head right now, but I am suspecting that Flink partitioning has something to do with all of our major bugs. I don't believe that the Flink engine supports zips for DataSets. In the case of Flink AewB a join is always done via DataSet.coGroup : val rowsA = A.asRowWise.ds val rowsB = B.asRowWise.ds implicit val kTag = op.keyClassTag val res: DataSet[(K, Vector)] = rowsA .coGroup(rowsB) .where(0) .equalTo(0) { (left, right, out: Collector[(K, Vector)]) => (left.toIterable.headOption, right.toIterable.headOption) match { case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b))) case (None, Some(b)) => out.collect(b) case (Some(a), None) => out.collect(a) case (None, None) => throw new RuntimeException( "At least one side of the co group " + "must be non-empty." ) } }
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          I am not sure about how the Flink engine handles the caching of intermediate results.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - I am not sure about how the Flink engine handles the caching of intermediate results.
          Hide
          smarthi Suneel Marthi added a comment - - edited

          There's a DataSetUtils which has methods for zipping Datasets.

          Show
          smarthi Suneel Marthi added a comment - - edited There's a DataSetUtils which has methods for zipping Datasets.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment - - edited

          From the above tests though we can see that in the case of B %*% A.t where B is a checkpointed non-deterministic matrix, Checkpointing is creating a barrier. I should have illustrated this better by Using A+B and also providing failure case. (I was also using it as a sanity test for FlinkOpAtB) But it does in fact fail without the checkpoint of B.

          Edit Note: Same as above.

          It seems that checkpointing the mapBlock result is the culprit. Maybe this speaks to your point about intermediate caching.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - - edited From the above tests though we can see that in the case of B %*% A.t where B is a checkpointed non-deterministic matrix, Checkpointing is creating a barrier. I should have illustrated this better by Using A+B and also providing failure case. (I was also using it as a sanity test for FlinkOpAtB) But it does in fact fail without the checkpoint of B . Edit Note : Same as above. It seems that checkpointing the mapBlock result is the culprit. Maybe this speaks to your point about intermediate caching.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Flink does cache intermediate results of a task by default:

          https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html

          However, caching of a DRM to memory is not possible and the call to CheckpointedFlinkDrm.cache(...) is only stubbed out.

          However this is the same situation in H2O and this test passes without issue there.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Flink does cache intermediate results of a task by default: https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html However, caching of a DRM to memory is not possible and the call to CheckpointedFlinkDrm.cache(...) is only stubbed out. However this is the same situation in H2O and this test passes without issue there.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Dmitriy LyubimovI am almost positive that the failure is due to checkpointing failing on mapBlock. I believe that this causing the test failures on dspca (MAHOUT-1809), ie, the internal mapBlock call in dspca. Could you let me know what your thoughts are on the join fix when you get a chance?

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Dmitriy Lyubimov I am almost positive that the failure is due to checkpointing failing on mapBlock. I believe that this causing the test failures on dspca ( MAHOUT-1809 ), ie, the internal mapBlock call in dspca. Could you let me know what your thoughts are on the join fix when you get a chance?
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          so if checkpoint doesn't cache, (which is the intent to get rid of determinism in this test), it is formal non-adherence to the contract of checkpoint and checkpoint caching capabilities (parameters CacheHint).

          So you are saying there's no way to cope with this?

          I think, in the worst case, the solution should seek dumping intermediate checkpoint (for cache hints other than None) to dfs or in-memory file system. various people are telling me that dfs can now have a pretty sizeable local cache configured too, so persistence is not so bad (but not as good as keeping object trees in the same jvm, of course).

          Show
          dlyubimov Dmitriy Lyubimov added a comment - so if checkpoint doesn't cache, (which is the intent to get rid of determinism in this test), it is formal non-adherence to the contract of checkpoint and checkpoint caching capabilities (parameters CacheHint). So you are saying there's no way to cope with this? I think, in the worst case, the solution should seek dumping intermediate checkpoint (for cache hints other than None) to dfs or in-memory file system. various people are telling me that dfs can now have a pretty sizeable local cache configured too, so persistence is not so bad (but not as good as keeping object trees in the same jvm, of course).
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          As far as I know there is no currently no Caching in the same sense as is done in Spark. I've been told this is to come in future releases.

          I am just getting familiar with the Flink engine. I wonder if we can use the DistributedFileCache?

          https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/cache/DistributedCache.DistributedCacheEntry.html

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - As far as I know there is no currently no Caching in the same sense as is done in Spark. I've been told this is to come in future releases. I am just getting familiar with the Flink engine. I wonder if we can use the DistributedFileCache ? https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/cache/DistributedCache.DistributedCacheEntry.html
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user andrewpalumbo opened a pull request:

          https://github.com/apache/mahout/pull/198

          MAHOUT-1810 persist data to file system and read back at each cache() call (FOR REVIEW)

          We still need to investigate Flink's `ExecutionEnvironment.registerCachedFile` to really cache things for access at back the end. However tests do pass with this.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1810

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/mahout/pull/198.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #198


          commit a9ca8d396eb3963a8e344da1e4e94024af4b437e
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-21T23:15:32Z

          Write data to file system and read back at each cache() call


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user andrewpalumbo opened a pull request: https://github.com/apache/mahout/pull/198 MAHOUT-1810 persist data to file system and read back at each cache() call (FOR REVIEW) We still need to investigate Flink's `ExecutionEnvironment.registerCachedFile` to really cache things for access at back the end. However tests do pass with this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1810 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/198.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #198 commit a9ca8d396eb3963a8e344da1e4e94024af4b437e Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-21T23:15:32Z Write data to file system and read back at each cache() call
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/198#issuecomment-199569347

          I suppose that I could just commit this and start another JIRA for the caching issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/198#issuecomment-199569347 I suppose that I could just commit this and start another JIRA for the caching issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user smarthi commented on the pull request:

          https://github.com/apache/mahout/pull/198#issuecomment-199583714

          Yeah, please commit this and start another Jira. This is a big win.

          Show
          githubbot ASF GitHub Bot added a comment - Github user smarthi commented on the pull request: https://github.com/apache/mahout/pull/198#issuecomment-199583714 Yeah, please commit this and start another Jira. This is a big win.
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Fixed via with by temporarily "Caching" to the local file System.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Fixed via with by temporarily "Caching" to the local file System.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo closed the pull request at:

          https://github.com/apache/mahout/pull/198

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo closed the pull request at: https://github.com/apache/mahout/pull/198
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          bulk-closing resolved issues

          Show
          dlyubimov Dmitriy Lyubimov added a comment - bulk-closing resolved issues
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/)
          MAHOUT-1810: Failing test in flink-bindings: A + B Identically (apalumbo: rev f4f42ae4c4c7555659edcc43669fec82f9537219)

          • flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
          • flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
          • flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
            MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm (apalumbo: rev 202b94f840286d4d0970f0427122697ba27fc1fb)
          • flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
          • flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
          • flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/ ) MAHOUT-1810 : Failing test in flink-bindings: A + B Identically (apalumbo: rev f4f42ae4c4c7555659edcc43669fec82f9537219) flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala MAHOUT-1810 : Use method taken from FlinkMLTools for CheckpointedFlinkDrm (apalumbo: rev 202b94f840286d4d0970f0427122697ba27fc1fb) flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala

            People

            • Assignee:
              Andrew_Palumbo Andrew Palumbo
              Reporter:
              Andrew_Palumbo Andrew Palumbo
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development

                  Agile