Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32148

LEFT JOIN generating non-deterministic and unexpected result (regression in Spark 3.0)

    XMLWordPrintableJSON

    Details

      Description

      When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT JOINs now output unexpected results.

      Below is a minimal example to run in spark-shell to demonstrate this. In it there are 3 events on the left side of the join and two on the right.
      The expected output should contain two matching pairs and one item on the left side without a matching right side, so that it should be output with the right side fields being NULL. The join condition is that event times must be max. 30sec apart and the IDs must match.

      import spark.implicits._
      import org.apache.spark.sql.Encoders
      import org.apache.spark.sql.functions.expr
      import org.apache.spark.sql.streaming.OutputMode
      import java.sql.Timestamp
      import java.util.UUID
      
      // Structure of left and right data items
      case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
      case class RightEntry(eventTime: Timestamp, id: String, name: String)
      
      // Some test data
      val leftData = Vector(
        LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
        LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
        LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
      )
      
      val rightData = Vector(
        RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
        RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
      )
      
      // Write test data, that we will stream from later (random output directories; alternatively we could delete the directories after each run)
      val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
      spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
      val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
      spark.createDataset(rightData).write.format("parquet").save(rightFilePath)
      
      // Read data from Parquet as stream
      val leftStream = spark.readStream
        .schema(Encoders.product[LeftEntry].schema)
        .parquet(leftFilePath)
        .withWatermark("eventTime", "2 minutes")
      val rightStream = spark.readStream
        .schema(Encoders.product[RightEntry].schema)
        .parquet(rightFilePath)
        .withWatermark("eventTime", "4 minutes")
      
      // Define Join
      val joinExpression = expr(
        s"""
           |leftStream.id = rightStream.id AND
           |leftStream.eventTime BETWEEN
           |  rightStream.eventTime - INTERVAL 30 seconds AND
           |  rightStream.eventTime + INTERVAL 30 seconds
          """.stripMargin
      )
      val joinedData = leftStream.as("leftStream")
        .join(
          rightStream.as("rightStream"),
          joinExpression,
          "left"
        )
      
      // Run query
      val query = joinedData.writeStream
        .format("memory")
        .queryName("myQuery")
        .outputMode(OutputMode.Append())
        .start()
      query.processAllAvailable()
      
      // Print results
      spark
        .table(query.name)
        .show(truncate = false)
      

      When this is executed with Spark 2.4.6, the result is as expected and deterministic:

      +-------------------+---+-------------------+-------------------+----+----+
      |eventTime          |id |comment            |eventTime          |id  |name|
      +-------------------+---+-------------------+-------------------+----+----+
      |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
      |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
      |2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  ← as expected
      +-------------------+---+-------------------+-------------------+----+----+
      

      When running the same code snippet with Spark 3.0.0, the result is non-deterministically one of these two:

      +-------------------+---+-------------+-------------------+----+----+
      |eventTime          |id |comment      |eventTime          |id  |name|
      +-------------------+---+-------------+-------------------+----+----+
      |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
      |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
      |2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this entry was already joined with "A" above,
      +-------------------+---+-------------+-------------------+----+----+    but is now here once more without it's right join side
      
      +-------------------+---+-------------+-------------------+----+----+
      |eventTime          |id |comment      |eventTime          |id  |name|
      +-------------------+---+-------------+-------------------+----+----+
      |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
      |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
      |2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this entry was already joined with "B" above,
      +-------------------+---+-------------+-------------------+----+----+    but is now here once more without it's right join side
      

      ... with "has no join partner" completely missing, and instead one of the actually joinable left-side items repeated without the right-side fields.


      In case the input data is modified, so that the non-joinable event additionally has a different ID, then Spark 3.0 generates correct output:

      // [...]
      val leftData = Vector(
        LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join partner"),
                                                          // ↑↑↑ changed
        LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
        LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
      )
      // [...]
      
      +-------------------+---+-------------------+-------------------+----+----+
      |eventTime          |id |comment            |eventTime          |id  |name|
      +-------------------+---+-------------------+-------------------+----+----+
      |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
      |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
      |2020-01-01 00:00:00|ddd|has no join partner|null               |null|null|
      +-------------------+---+-------------------+-------------------+----+----+
      

        Attachments

          Activity

            People

            • Assignee:
              kabhwan Jungtaek Lim
              Reporter:
              hiddenbit Michael
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: