Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26154

Stream-stream joins - left outer join gives inconsistent output

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 2.3.2, 3.0.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
    • Environment:

      Spark version - Spark 2.3.2

      OS- Suse 11

      Description

      Stream-stream joins using left outer join gives inconsistent  output 

      The data processed once, is being processed again and gives null value. In Batch 2, the input data  "3" is processed. But again in batch 6, null value is provided for same data

      Steps
      In spark-shell

      scala>     import org.apache.spark.sql.functions.{col, expr}
      import org.apache.spark.sql.functions.{col, expr}
      scala>     import org.apache.spark.sql.streaming.Trigger
      import org.apache.spark.sql.streaming.Trigger
      scala>     val lines_stream1 = spark.readStream.
           |       format("kafka").
           |       option("kafka.bootstrap.servers", "ip:9092").
           |       option("subscribe", "topic1").
           |       option("includeTimestamp", true).
           |       load().
           |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
           |       select(col("value") as("data"),col("timestamp") as("recordTime")).
           |       select("data","recordTime").
           |       withWatermark("recordTime", "5 seconds ")
      lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp]
      scala>     val lines_stream2 = spark.readStream.
           |       format("kafka").
           |       option("kafka.bootstrap.servers", "ip:9092").
           |       option("subscribe", "topic2").
           |       option("includeTimestamp", value = true).
           |       load().
           |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
           |       select(col("value") as("data1"),col("timestamp") as("recordTime1")).
           |       select("data1","recordTime1").
           |       withWatermark("recordTime1", "10 seconds ")
      lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp]
      scala>     val query = lines_stream1.join(lines_stream2, expr (
           |       """
           |         | data == data1 and
           |         | recordTime1 >= recordTime and
           |         | recordTime1 <= recordTime + interval 5 seconds
           |       """.stripMargin),"left").
           |       writeStream.
           |       option("truncate","false").
           |       outputMode("append").
           |       format("console").option("checkpointLocation", "/tmp/leftouter/").
           |       trigger(Trigger.ProcessingTime ("5 seconds")).
           |       start()
      query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
      

      Step2 : Start producing data

      kafka-console-producer.sh --broker-list ip:9092 --topic topic1
      >1
      >2
      >3
      >4
      >5
      >aa
      >bb
      >cc

      kafka-console-producer.sh --broker-list ip:9092 --topic topic2
      >2
      >2
      >3
      >4
      >5
      >aa
      >cc
      >ee
      >ee

       

      Output obtained:

      Batch: 0
      -------------------------------------------
      +----+----------+-----+-----------+
      |data|recordTime|data1|recordTime1|
      +----+----------+-----+-----------+
      +----+----------+-----+-----------+
      
      -------------------------------------------
      Batch: 1
      -------------------------------------------
      +----+----------+-----+-----------+
      |data|recordTime|data1|recordTime1|
      +----+----------+-----+-----------+
      +----+----------+-----+-----------+
      
      -------------------------------------------
      Batch: 2
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |3   |2018-11-22 20:09:35.053|3    |2018-11-22 20:09:36.506|
      |2   |2018-11-22 20:09:31.613|2    |2018-11-22 20:09:33.116|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 3
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |4   |2018-11-22 20:09:38.654|4    |2018-11-22 20:09:39.818|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 4
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |5   |2018-11-22 20:09:44.809|5    |2018-11-22 20:09:47.452|
      |1   |2018-11-22 20:09:22.662|null |null                   |
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 5
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |cc  |2018-11-22 20:10:06.654|cc   |2018-11-22 20:10:08.701|
      |aa  |2018-11-22 20:10:01.536|aa   |2018-11-22 20:10:03.259|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 6
      -------------------------------------------
      +----+-----------------------+-----+-----------+
      |data|recordTime             |data1|recordTime1|
      +----+-----------------------+-----+-----------+
      |3   |2018-11-22 20:09:35.053|null |null       |
      +----+-----------------------+-----+-----------+
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                hari28 Haripriya
              • Votes:
                0 Vote for this issue
                Watchers:
                16 Start watching this issue

                Dates

                • Created:
                  Updated: