Details
Description
Consider the following (rough) code that attempts to calculate all-pairs shortest paths via pregel:
def allPairsShortestPaths: RDD[(VertexId, HashMap[VertexId, ParentDist])] = { val initialMsg = HashMap(-1L -> ParentDist(-1L, -1L)) val pregelg = g.mapVertices((vid, vd) => (vd, HashMap[VertexId, ParentDist](vid -> ParentDist(vid, 0L)))).reverse def vprog(v: VertexId, value: (VD, HashMap[VertexId, ParentDist]), message: HashMap[VertexId, ParentDist]): (VD, HashMap[VertexId, ParentDist]) = { val updatedValues = mm2(value._2, message).filter(v => v._2.dist >= 0) (value._1, updatedValues) } def sendMsg(triplet: EdgeTriplet[(VD, HashMap[VertexId, ParentDist]), ED]): Iterator[(VertexId, HashMap[VertexId, ParentDist])] = { val dstVertexId = triplet.dstId val srcMap = triplet.srcAttr._2 val dstMap = triplet.dstAttr._2 // guaranteed to have dstVertexId as a key val updatesToSend : HashMap[VertexId, ParentDist] = srcMap.filter { case (vid, srcPD) => dstMap.get(vid) match { case Some(dstPD) => dstPD.dist > srcPD.dist + 1 // if it exists, is it cheaper? case _ => true // not found - new update } }.map(u => u._1 -> ParentDist(triplet.srcId, u._2.dist +1)) if (updatesToSend.nonEmpty) Iterator[(VertexId, HashMap[VertexId, ParentDist])]((dstVertexId, updatesToSend)) else Iterator.empty } def mergeMsg(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, ParentDist]): HashMap[VertexId, ParentDist] = { // when the following two lines are commented out, the program fails with // 16/12/17 19:53:50 INFO DAGScheduler: Job 24 failed: reduce at VertexRDDImpl.scala:88, took 0.244042 s // Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1099.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1099.0 (TID 129, localhost): scala.MatchError: (null,null) (of class scala.Tuple2) m1.foreach(_ => ()) m2.foreach(_ => ()) m1.merged(m2) { case ((k1, v1), (_, v2)) => (k1, v1.min(v2)) } } // mm2 is here just to provide a separate function for vprog. Ideally we'd just re-use mergeMsg. def mm2(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, ParentDist]): HashMap[VertexId, ParentDist] = { m1.merged(m2) { case ((k1, v1), (_, v2)) => (k1, v1.min(v2)) case n => throw new Exception("we've got a problem: " + n) } } val pregelRun = pregelg.pregel(initialMsg)(vprog, sendMsg, mergeMsg) val sps = pregelRun.vertices.map(v => v._1 -> v._2._2) sps } }
Note the comment in the mergeMsg function: when the messages are explicitly accessed prior to the .merged statement, the code works. If these side-effect statements are removed / commented out, the error message in the comments is generated.
This fails consistently on a 50-node undirected cyclegraph.