Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-1955

VertexRDD can incorrectly assume index sharing

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.9.0, 0.9.1, 1.0.0
    • 1.2.2, 1.3.0
    • GraphX
    • None

    Description

      Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join if both operands are VertexRDDs sharing the same index (i.e., one operand is derived from the other). This check is implemented by matching on the operand type and using the fast join strategy if both are VertexRDDs.

      This is clearly fine when both do in fact share the same index. It is also fine when the two VertexRDDs have the same partitioner but different indexes, because each VertexPartition will detect the index mismatch and fall back to the slow but correct local join strategy.

      However, when they have different numbers of partitions or different partition functions, an exception or even silently incorrect results can occur.

      For example:

      import org.apache.spark._
      import org.apache.spark.graphx._
      
      // Construct VertexRDDs with different numbers of partitions
      val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
      val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
      // Try to join them. Appears to work...
      val c = a.innerJoin(b) { (vid, x, y) => x + y }
      // ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
      c.collect
      
      // Construct VertexRDDs with different partition functions
      val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new HashPartitioner(2)))
      val bVerts = sc.parallelize(List((1L, 5)))
      val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
      // Try to join them. We expect (1L, 7).
      val c = a.innerJoin(b) { (vid, x, y) => x + y }
      // Silent failure: we get an empty set!
      c.collect
      

      VertexRDD should check equality of partitioners before using the fast zip join. If the partitioners are different, the two datasets should be automatically co-partitioned.

      Attachments

        Issue Links

          Activity

            People

              boyork Brennon York
              ankurd Ankur Dave
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: