Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23678

a more efficient partition strategy

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Component/s: GraphX
    • Labels:
      None

      Description

      Recently, I found a new partition strategy (call EdgePartitionTriangle), which is a combination of the partition strategy EdgePartition2D and the the partition strategy CanonicalRandomVertexCut. This partition strategy has three advantages:
      1. nicer bound on vertex replication, sqrt(2 * numParts).
      2. colocate all edges between two vertices regardless of direction.
      3. same work balance compared with EdgePartition2D

      See https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb

      The main idea is to virtually partitioned by EdgePartition2D, gets partitions

      {(i,j)|i=1,2,..,k, j=1,2,..,k}

      . Then relocate partitions by folding the virtual partitions, such as:

      (1,0) and (0,1) -> (1, 0)

      (2,1) and (1,2) -> (2, 1)

      ...

      (k, k-1) and (k-1, k) -> (k, k -1)

       
      Finally, maps {(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to {0,1,...,k*(k-1) / 2}

      The complete method needs to handle more details:

      1.  when numParts is not a triangle number, partitions are divided into two types: triangleParts and rests. The later one is partitioned by a different strategy.

      2. when edges are virtually located to partition (a, a), Then they should be relocated to partition

      {(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}

      to achieve better work balance.

      codes: 

      object EdgePartitionTriangle extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          val mixingPrime: VertexId = 1125899906842597L
          val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
          val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
          val segmentFactor = 100 // positive even numbers
          val numSegments = (segmentFactor * math.sqrt(4 * numParts * numTriParts)).toInt
          val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
          val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
          var row = segRow / (segmentFactor * numRowTriParts)
          var col = segCol / (segmentFactor * numRowTriParts)
          if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
            row = numRowTriParts + 1
            col = math.min(segRow, segCol) % (numParts - numTriParts)
          }
          else if (row == col) {
            val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
            col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % (numRowTriParts + 1)
          }
          if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
        }
      

       

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              weiwee wenbinwei
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: