Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
0.11.2
-
None
-
Nov/Dec-2015
Description
the A + B, Identically Partitioned test in the Flink RLikeDrmOpsSuite fails. This test failure likely indicates an issue with Flink's Checkpointing or mapBlock operator:
test("C = A + B, identically partitioned") { val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) val A = drmParallelize(inCoreA, numPartitions = 2) // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. val B = A.mapBlock() { case (keys, block) => val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} keys -> bBlock } // Prevent repeated computation non-determinism // flink problem is here... checkpoint is not doing what it should .checkpoint() val inCoreB = B.collect printf("A=\n%s\n", inCoreA) printf("B=\n%s\n", inCoreB) val C = A + B val inCoreC = C.collect printf("C=\n%s\n", inCoreC) // Actual val inCoreCControl = inCoreA + inCoreB (inCoreC - inCoreCControl).norm should be < 1E-10 }
The output shous clearly that the line:
val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
in the mapBlock closure is being calculated more than once.
Output:
A= { 0 => {0:1.0,1:2.0,2:3.0} 1 => {0:3.0,1:4.0,2:5.0} 2 => {0:5.0,1:6.0,2:7.0} } B= { 0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655} 1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614} 2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948} } C= { 0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195} 1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803} 2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352} }
Attachments
Issue Links
- is part of
-
MAHOUT-1570 Adding support for Apache Flink as a backend for the Mahout DSL
- Closed
- is related to
-
MAHOUT-1809 Failing tests in Flink-bindings: dals and dspca
- Closed