Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18916

Possible bug in Pregel / mergeMsg with hashmaps

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.0.2
    • None
    • GraphX
    • OSX / IntelliJ IDEA 2016.3 CE EAP, Scala 2.11.8, Spark 2.0.2

    Description

      Consider the following (rough) code that attempts to calculate all-pairs shortest paths via pregel:

          def allPairsShortestPaths: RDD[(VertexId, HashMap[VertexId, ParentDist])] = {
            val initialMsg = HashMap(-1L -> ParentDist(-1L, -1L))
            val pregelg = g.mapVertices((vid, vd) => (vd, HashMap[VertexId, ParentDist](vid -> ParentDist(vid, 0L)))).reverse
      
            def vprog(v: VertexId, value: (VD, HashMap[VertexId, ParentDist]), message: HashMap[VertexId, ParentDist]): (VD, HashMap[VertexId, ParentDist]) = {
              val updatedValues = mm2(value._2, message).filter(v => v._2.dist >= 0)
              (value._1, updatedValues)
            }
      
            def sendMsg(triplet: EdgeTriplet[(VD, HashMap[VertexId, ParentDist]), ED]): Iterator[(VertexId, HashMap[VertexId, ParentDist])] = {
              val dstVertexId = triplet.dstId
              val srcMap = triplet.srcAttr._2
              val dstMap = triplet.dstAttr._2  // guaranteed to have dstVertexId as a key
      
              val updatesToSend : HashMap[VertexId, ParentDist] = srcMap.filter {
                case (vid, srcPD) => dstMap.get(vid) match {
                  case Some(dstPD) => dstPD.dist > srcPD.dist + 1   // if it exists, is it cheaper?
                  case _ => true // not found - new update
                }
              }.map(u => u._1 -> ParentDist(triplet.srcId, u._2.dist +1))
      
      
              if (updatesToSend.nonEmpty)
                Iterator[(VertexId, HashMap[VertexId, ParentDist])]((dstVertexId, updatesToSend))
              else
                Iterator.empty
            }
      
            def mergeMsg(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, ParentDist]): HashMap[VertexId, ParentDist] = {
      
              // when the following two lines are commented out, the program fails with
              // 16/12/17 19:53:50 INFO DAGScheduler: Job 24 failed: reduce at VertexRDDImpl.scala:88, took 0.244042 s
              // Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1099.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1099.0 (TID 129, localhost): scala.MatchError: (null,null) (of class scala.Tuple2)
      
              m1.foreach(_ => ())
              m2.foreach(_ => ())
      
              m1.merged(m2) {
                case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
              }
            }
      
            // mm2 is here just to provide a separate function for vprog. Ideally we'd just re-use mergeMsg.
            def mm2(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, ParentDist]): HashMap[VertexId, ParentDist] = {
              m1.merged(m2) {
                case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
                case n => throw new Exception("we've got a problem: " + n)
              }
            }
      
            val pregelRun = pregelg.pregel(initialMsg)(vprog, sendMsg, mergeMsg)
            val sps = pregelRun.vertices.map(v => v._1 -> v._2._2)
            sps
          }
        }
      

      Note the comment in the mergeMsg function: when the messages are explicitly accessed prior to the .merged statement, the code works. If these side-effect statements are removed / commented out, the error message in the comments is generated.

      This fails consistently on a 50-node undirected cyclegraph.

      Attachments

        Activity

          People

            Unassigned Unassigned
            bromberger Seth Bromberger
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: