Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
2.3.0, 2.3.2, 3.0.0
-
Spark version - Spark 2.3.2
OS- Suse 11
Description
Stream-stream joins using left outer join gives inconsistent output
The data processed once, is being processed again and gives null value. In Batch 2, the input data "3" is processed. But again in batch 6, null value is provided for same data
Steps
In spark-shell
scala> import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.functions.{col, expr} scala> import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.streaming.Trigger scala> val lines_stream1 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic1"). | option("includeTimestamp", true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data"),col("timestamp") as("recordTime")). | select("data","recordTime"). | withWatermark("recordTime", "5 seconds ") lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data: string, recordTime: timestamp] scala> val lines_stream2 = spark.readStream. | format("kafka"). | option("kafka.bootstrap.servers", "ip:9092"). | option("subscribe", "topic2"). | option("includeTimestamp", value = true). | load(). | selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)]. | select(col("value") as("data1"),col("timestamp") as("recordTime1")). | select("data1","recordTime1"). | withWatermark("recordTime1", "10 seconds ") lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [data1: string, recordTime1: timestamp] scala> val query = lines_stream1.join(lines_stream2, expr ( | """ | | data == data1 and | | recordTime1 >= recordTime and | | recordTime1 <= recordTime + interval 5 seconds | """.stripMargin),"left"). | writeStream. | option("truncate","false"). | outputMode("append"). | format("console").option("checkpointLocation", "/tmp/leftouter/"). | trigger(Trigger.ProcessingTime ("5 seconds")). | start() query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
Step2 : Start producing data
kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>1
>2
>3
>4
>5
>aa
>bb
>cc
kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>2
>2
>3
>4
>5
>aa
>cc
>ee
>ee
Output obtained:
Batch: 0 ------------------------------------------- +----+----------+-----+-----------+ |data|recordTime|data1|recordTime1| +----+----------+-----+-----------+ +----+----------+-----+-----------+ ------------------------------------------- Batch: 1 ------------------------------------------- +----+----------+-----+-----------+ |data|recordTime|data1|recordTime1| +----+----------+-----+-----------+ +----+----------+-----+-----------+ ------------------------------------------- Batch: 2 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |3 |2018-11-22 20:09:35.053|3 |2018-11-22 20:09:36.506| |2 |2018-11-22 20:09:31.613|2 |2018-11-22 20:09:33.116| +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 3 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |4 |2018-11-22 20:09:38.654|4 |2018-11-22 20:09:39.818| +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 4 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |5 |2018-11-22 20:09:44.809|5 |2018-11-22 20:09:47.452| |1 |2018-11-22 20:09:22.662|null |null | +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 5 ------------------------------------------- +----+-----------------------+-----+-----------------------+ |data|recordTime |data1|recordTime1 | +----+-----------------------+-----+-----------------------+ |cc |2018-11-22 20:10:06.654|cc |2018-11-22 20:10:08.701| |aa |2018-11-22 20:10:01.536|aa |2018-11-22 20:10:03.259| +----+-----------------------+-----+-----------------------+ ------------------------------------------- Batch: 6 ------------------------------------------- +----+-----------------------+-----+-----------+ |data|recordTime |data1|recordTime1| +----+-----------------------+-----+-----------+ |3 |2018-11-22 20:09:35.053|null |null | +----+-----------------------+-----+-----------+
Attachments
Issue Links
- duplicates
-
SPARK-27433 Spark Structured Streaming left outer joins returns outer nulls for already matched rows
- Resolved
-
SPARK-26187 Stream-stream left outer join returns outer nulls for already matched rows
- Resolved
- links to