Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33259

Joining 3 streams results in incorrect output

    XMLWordPrintableJSON

Details

    Description

      I encountered an issue with Structured Streaming when doing a ((A LEFT JOIN B) INNER JOIN C) operation. Below you can see example code I posted on Stackoverflow...

      I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".

      The script generates two outputs: sessionStartsWithMetadata result from "start" events that are left-joined with the "metadata" events, based on sessionId. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.

      Additionally a DataFrame endedSessionsWithMetadata is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.

      This code can be executed in spark-shell:

      import java.sql.Timestamp
      import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
      import org.apache.spark.sql.streaming.StreamingQuery
      import org.apache.spark.sql.{DataFrame, SQLContext}
      import org.apache.spark.sql.functions.{col, expr, lit}
      
      import spark.implicits._
      implicit val sqlContext: SQLContext = spark.sqlContext
      
      // Main data processing, regardless whether batch or stream processing
      def process(
          sessionStartEvents: DataFrame,
          sessionOptionalMetadataEvents: DataFrame,
          sessionEndEvents: DataFrame
      ): (DataFrame, DataFrame) = {
        val sessionStartsWithMetadata: DataFrame = sessionStartEvents
          .join(
            sessionOptionalMetadataEvents,
            sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
              sessionStartEvents("sessionStartTimestamp").between(
                sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
                sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
              ),
            "left" // metadata is optional
          )
          .select(
            sessionStartEvents("sessionId"),
            sessionStartEvents("sessionStartTimestamp"),
            sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
          )
      
        val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
          sessionEndEvents,
          sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
            sessionStartsWithMetadata("sessionStartTimestamp").between(
              sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
              sessionEndEvents("sessionEndTimestamp")
            )
        )
      
        (sessionStartsWithMetadata, endedSessionsWithMetadata)
      }
      
      def streamProcessing(
          sessionStartData: Seq[(Timestamp, Int)],
          sessionOptionalMetadata: Seq[(Timestamp, Int)],
          sessionEndData: Seq[(Timestamp, Int)]
      ): (StreamingQuery, StreamingQuery) = {
      
        val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
        sessionStartEventsStream.addData(sessionStartData)
      
        val sessionStartEvents: DataFrame = sessionStartEventsStream
          .toDS()
          .toDF("sessionStartTimestamp", "sessionId")
          .withWatermark("sessionStartTimestamp", "1 second")
      
        val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
        sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
      
        val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
          .toDS()
          .toDF("sessionOptionalMetadataTimestamp", "sessionId")
          .withWatermark("sessionOptionalMetadataTimestamp", "1 second")
      
        val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
        sessionEndEventsStream.addData(sessionEndData)
      
        val sessionEndEvents: DataFrame = sessionEndEventsStream
          .toDS()
          .toDF("sessionEndTimestamp", "sessionId")
          .withWatermark("sessionEndTimestamp", "1 second")
      
        val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
          process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
      
        val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
          .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
          .writeStream
          .outputMode("append")
          .format("console")
          .option("truncate", "false")
          .option("numRows", "1000")
          .start()
      
        val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
          .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
          .writeStream
          .outputMode("append")
          .format("console")
          .option("truncate", "false")
          .option("numRows", "1000")
          .start()
      
        (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
      }
      
      def batchProcessing(
          sessionStartData: Seq[(Timestamp, Int)],
          sessionOptionalMetadata: Seq[(Timestamp, Int)],
          sessionEndData: Seq[(Timestamp, Int)]
      ): Unit = {
      
        val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
        val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
        val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
      
        val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
          process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
      
        println("sessionStartsWithMetadata")
        sessionStartsWithMetadata.show(100, truncate = false)
      
        println("endedSessionsWithMetadata")
        endedSessionsWithMetadata.show(100, truncate = false)
      }
      
      
      // Data is represented as tuples of (eventTime, sessionId)...
      val sessionStartData = Vector(
        (new Timestamp(1), 0),
        (new Timestamp(2000), 1),
        (new Timestamp(2000), 2),
        (new Timestamp(20000), 10)
      )
      
      val sessionOptionalMetadata = Vector(
        (new Timestamp(1), 0),
        // session `1` has no metadata
        (new Timestamp(2000), 2),
        (new Timestamp(20000), 10)
      )
      
      val sessionEndData = Vector(
        (new Timestamp(10000), 0),
        (new Timestamp(11000), 1),
        (new Timestamp(12000), 2),
        (new Timestamp(30000), 10)
      )
      
      batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
      
      val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
        streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
      

      In the example session with ID 1 has no metadata, so the respective metadata column is null.

      The main functionality of joining the data is implemented in def process(…), which is called using both batch data and stream data.

      In the batch version the output is as expected:

      sessionStartsWithMetadata
      +---------+-----------------------+--------------------------------+
      |sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
      +---------+-----------------------+--------------------------------+
      |0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
      |1        |1970-01-01 01:00:02    |null                            | ← has no metadata ✔
      |2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
      |10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
      +---------+-----------------------+--------------------------------+
      
      endedSessionsWithMetadata
      +---------+-----------------------+--------------------------------+-------------------+---------+
      |sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
      +---------+-----------------------+--------------------------------+-------------------+---------+
      |0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
      |1        |1970-01-01 01:00:02    |null                            |1970-01-01 01:00:11|1        |  ← has no metadata ✔
      |2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
      |10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
      +---------+-----------------------+--------------------------------+-------------------+---------+
      

      But when the same processing is run as stream processing the output of endedSessionsWithMetadata does not contain the entry of session 1 that has no metadata:

      -------------------------------------------
      Batch: 0 ("start event")
      -------------------------------------------
      +-------------------------+---------+-----------------------+--------------------------------+
      |sessionStartsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
      +-------------------------+---------+-----------------------+--------------------------------+
      |sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
      |sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
      |sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
      +-------------------------+---------+-----------------------+--------------------------------+
      
      -------------------------------------------
      Batch: 0 ("end event")
      -------------------------------------------
      +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
      |endedSessionsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
      +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
      |endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
      |endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
      |endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
      +-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
      
      -------------------------------------------
      Batch: 1 ("start event")
      -------------------------------------------
      +-------------------------+---------+---------------------+--------------------------------+
      |sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
      +-------------------------+---------+---------------------+--------------------------------+
      |sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                            | ← has no metadata ✔
      +-------------------------+---------+---------------------+--------------------------------+
      
      -------------------------------------------
      Batch: 1 ("end event")
      -------------------------------------------
      +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
      |endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
      +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
      +-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
        ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
      

      In a response it was suggested the issue looks related to kabhwan's mailing list post, but since I couldn't find a ticket here tracking the above mentioned issue, I'm creating this one.

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              hiddenbit Michael
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: