Description
The 'Part 1' and 'Part 2' could run in parallel
/** * The implementation for these joins: * * LeftOuter with BuildLeft * RightOuter with BuildRight * FullOuter */ private def defaultJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = { val streamRdd = streamed.execute() // Part 1 val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd, relation) val notMatchedBroadcastRows: Seq[InternalRow] = { val nulls = new GenericInternalRow(streamed.output.size) val buf: CompactBuffer[InternalRow] = new CompactBuffer() val joinedRow = new JoinedRow joinedRow.withLeft(nulls) var i = 0 val buildRows = relation.value while (i < buildRows.length) { if (!matchedBroadcastRows.get(i)) { buf += joinedRow.withRight(buildRows(i)).copy() } i += 1 } buf } // Part 2 val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow val nulls = new GenericInternalRow(broadcast.output.size) streamedIter.flatMap { streamedRow => var i = 0 var foundMatch = false val matchedRows = new CompactBuffer[InternalRow] while (i < buildRows.length) { if (boundCondition(joinedRow(streamedRow, buildRows(i)))) { matchedRows += joinedRow.copy() foundMatch = true } i += 1 } if (!foundMatch && joinType == FullOuter) { matchedRows += joinedRow(streamedRow, nulls).copy() } matchedRows.iterator } } // Union sparkContext.union( matchedStreamRows, sparkContext.makeRDD(notMatchedBroadcastRows) ) }