Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.0.0
Description
When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT JOINs now output unexpected results.
Below is a minimal example to run in spark-shell to demonstrate this. In it there are 3 events on the left side of the join and two on the right.
The expected output should contain two matching pairs and one item on the left side without a matching right side, so that it should be output with the right side fields being NULL. The join condition is that event times must be max. 30sec apart and the IDs must match.
import spark.implicits._ import org.apache.spark.sql.Encoders import org.apache.spark.sql.functions.expr import org.apache.spark.sql.streaming.OutputMode import java.sql.Timestamp import java.util.UUID // Structure of left and right data items case class LeftEntry(eventTime: Timestamp, id: String, comment: String) case class RightEntry(eventTime: Timestamp, id: String, name: String) // Some test data val leftData = Vector( LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"), LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"), LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B") ) val rightData = Vector( RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"), RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B") ) // Write test data, that we will stream from later (random output directories; alternatively we could delete the directories after each run) val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}" spark.createDataset(leftData).write.format("parquet").save(leftFilePath) val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}" spark.createDataset(rightData).write.format("parquet").save(rightFilePath) // Read data from Parquet as stream val leftStream = spark.readStream .schema(Encoders.product[LeftEntry].schema) .parquet(leftFilePath) .withWatermark("eventTime", "2 minutes") val rightStream = spark.readStream .schema(Encoders.product[RightEntry].schema) .parquet(rightFilePath) .withWatermark("eventTime", "4 minutes") // Define Join val joinExpression = expr( s""" |leftStream.id = rightStream.id AND |leftStream.eventTime BETWEEN | rightStream.eventTime - INTERVAL 30 seconds AND | rightStream.eventTime + INTERVAL 30 seconds """.stripMargin ) val joinedData = leftStream.as("leftStream") .join( rightStream.as("rightStream"), joinExpression, "left" ) // Run query val query = joinedData.writeStream .format("memory") .queryName("myQuery") .outputMode(OutputMode.Append()) .start() query.processAllAvailable() // Print results spark .table(query.name) .show(truncate = false)
When this is executed with Spark 2.4.6, the result is as expected and deterministic:
+-------------------+---+-------------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------------+-------------------+----+----+ |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A | |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B | |2020-01-01 00:00:00|abc|has no join partner|null |null|null| ← as expected +-------------------+---+-------------------+-------------------+----+----+
When running the same code snippet with Spark 3.0.0, the result is non-deterministically one of these two:
+-------------------+---+-------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------+-------------------+----+----+ |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | |2020-01-02 00:00:00|abc|joined with A|null |null|null| ← this entry was already joined with "A" above, +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side
+-------------------+---+-------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------+-------------------+----+----+ |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A | |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B | |2020-01-02 01:00:00|abc|joined with B|null |null|null| ← this entry was already joined with "B" above, +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side
... with "has no join partner" completely missing, and instead one of the actually joinable left-side items repeated without the right-side fields.
In case the input data is modified, so that the non-joinable event additionally has a different ID, then Spark 3.0 generates correct output:
// [...] val leftData = Vector( LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join partner"), // ↑↑↑ changed LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"), LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B") ) // [...]
+-------------------+---+-------------------+-------------------+----+----+ |eventTime |id |comment |eventTime |id |name| +-------------------+---+-------------------+-------------------+----+----+ |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A | |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B | |2020-01-01 00:00:00|ddd|has no join partner|null |null|null| +-------------------+---+-------------------+-------------------+----+----+