Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40487

Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • SQL
    • None

    Description

      The 'Part 1' and 'Part 2' could run in parallel

        /**
         * The implementation for these joins:
         *
         *   LeftOuter with BuildLeft
         *   RightOuter with BuildRight
         *   FullOuter
         */
        private def defaultJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
          val streamRdd = streamed.execute()
      
          // Part 1
          val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd, relation)
          val notMatchedBroadcastRows: Seq[InternalRow] = {
            val nulls = new GenericInternalRow(streamed.output.size)
            val buf: CompactBuffer[InternalRow] = new CompactBuffer()
            val joinedRow = new JoinedRow
            joinedRow.withLeft(nulls)
            var i = 0
            val buildRows = relation.value
            while (i < buildRows.length) {
              if (!matchedBroadcastRows.get(i)) {
                buf += joinedRow.withRight(buildRows(i)).copy()
              }
              i += 1
            }
            buf
          }
      
          // Part 2
          val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
            val buildRows = relation.value
            val joinedRow = new JoinedRow
            val nulls = new GenericInternalRow(broadcast.output.size)
      
            streamedIter.flatMap { streamedRow =>
              var i = 0
              var foundMatch = false
              val matchedRows = new CompactBuffer[InternalRow]
      
              while (i < buildRows.length) {
                if (boundCondition(joinedRow(streamedRow, buildRows(i)))) {
                  matchedRows += joinedRow.copy()
                  foundMatch = true
                }
                i += 1
              }
      
              if (!foundMatch && joinType == FullOuter) {
                matchedRows += joinedRow(streamedRow, nulls).copy()
              }
              matchedRows.iterator
            }
          }
      
          // Union
          sparkContext.union(
            matchedStreamRows,
            sparkContext.makeRDD(notMatchedBroadcastRows)
          )
        }

      Attachments

        Activity

          People

            Xingchao Xingchao, Zhang
            Xingchao Xingchao, Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: