Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26154

Stream-stream joins - left outer join gives inconsistent output

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.3.0, 2.3.2, 3.0.0
    • 3.0.0
    • Structured Streaming
    • Spark version - Spark 2.3.2

      OS- Suse 11

    Description

      Stream-stream joins using left outer join gives inconsistent  output 

      The data processed once, is being processed again and gives null value. In Batch 2, the input data  "3" is processed. But again in batch 6, null value is provided for same data

      Steps
      In spark-shell

      scala>     import org.apache.spark.sql.functions.{col, expr}
      import org.apache.spark.sql.functions.{col, expr}
      scala>     import org.apache.spark.sql.streaming.Trigger
      import org.apache.spark.sql.streaming.Trigger
      scala>     val lines_stream1 = spark.readStream.
           |       format("kafka").
           |       option("kafka.bootstrap.servers", "ip:9092").
           |       option("subscribe", "topic1").
           |       option("includeTimestamp", true).
           |       load().
           |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
           |       select(col("value") as("data"),col("timestamp") as("recordTime")).
           |       select("data","recordTime").
           |       withWatermark("recordTime", "5 seconds ")
      lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp]
      scala>     val lines_stream2 = spark.readStream.
           |       format("kafka").
           |       option("kafka.bootstrap.servers", "ip:9092").
           |       option("subscribe", "topic2").
           |       option("includeTimestamp", value = true).
           |       load().
           |       selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
           |       select(col("value") as("data1"),col("timestamp") as("recordTime1")).
           |       select("data1","recordTime1").
           |       withWatermark("recordTime1", "10 seconds ")
      lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp]
      scala>     val query = lines_stream1.join(lines_stream2, expr (
           |       """
           |         | data == data1 and
           |         | recordTime1 >= recordTime and
           |         | recordTime1 <= recordTime + interval 5 seconds
           |       """.stripMargin),"left").
           |       writeStream.
           |       option("truncate","false").
           |       outputMode("append").
           |       format("console").option("checkpointLocation", "/tmp/leftouter/").
           |       trigger(Trigger.ProcessingTime ("5 seconds")).
           |       start()
      query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
      

      Step2 : Start producing data

      kafka-console-producer.sh --broker-list ip:9092 --topic topic1
      >1
      >2
      >3
      >4
      >5
      >aa
      >bb
      >cc

      kafka-console-producer.sh --broker-list ip:9092 --topic topic2
      >2
      >2
      >3
      >4
      >5
      >aa
      >cc
      >ee
      >ee

       

      Output obtained:

      Batch: 0
      -------------------------------------------
      +----+----------+-----+-----------+
      |data|recordTime|data1|recordTime1|
      +----+----------+-----+-----------+
      +----+----------+-----+-----------+
      
      -------------------------------------------
      Batch: 1
      -------------------------------------------
      +----+----------+-----+-----------+
      |data|recordTime|data1|recordTime1|
      +----+----------+-----+-----------+
      +----+----------+-----+-----------+
      
      -------------------------------------------
      Batch: 2
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |3   |2018-11-22 20:09:35.053|3    |2018-11-22 20:09:36.506|
      |2   |2018-11-22 20:09:31.613|2    |2018-11-22 20:09:33.116|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 3
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |4   |2018-11-22 20:09:38.654|4    |2018-11-22 20:09:39.818|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 4
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |5   |2018-11-22 20:09:44.809|5    |2018-11-22 20:09:47.452|
      |1   |2018-11-22 20:09:22.662|null |null                   |
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 5
      -------------------------------------------
      +----+-----------------------+-----+-----------------------+
      |data|recordTime             |data1|recordTime1            |
      +----+-----------------------+-----+-----------------------+
      |cc  |2018-11-22 20:10:06.654|cc   |2018-11-22 20:10:08.701|
      |aa  |2018-11-22 20:10:01.536|aa   |2018-11-22 20:10:03.259|
      +----+-----------------------+-----+-----------------------+
      
      -------------------------------------------
      Batch: 6
      -------------------------------------------
      +----+-----------------------+-----+-----------+
      |data|recordTime             |data1|recordTime1|
      +----+-----------------------+-----+-----------+
      |3   |2018-11-22 20:09:35.053|null |null       |
      +----+-----------------------+-----+-----------+
      
      

      Attachments

        Issue Links

          Activity

            People

              kabhwan Jungtaek Lim
              hari28 Haripriya
              Votes:
              0 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: