Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Duplicate
-
2.3.0
-
None
-
None
Description
I m basically using the example given in Spark's the documentation here: https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#outer-joins-with-watermarking with the built-in test stream in which one stream is ahead by 3 seconds (was originally using kafka but ran into the same issue). The results returned the match columns correctly, however after a while the same key is returned with an outer null.
Is this the expected behavior? Is there a way to exclude the duplicate outer null results when there was a match?
Code:
val testStream = session.readStream.format("rate") .option("rowsPerSecond", "5").option("numPartitions", "1").load() val impressions = testStream .select( (col("value") + 15).as("impressionAdId"), col("timestamp").as("impressionTime")) val clicks = testStream .select( col("value").as("clickAdId"), col("timestamp").as("clickTime")) // Apply watermarks on event-time columns val impressionsWithWatermark = impressions.withWatermark("impressionTime", "20 seconds") val clicksWithWatermark = clicks.withWatermark("clickTime", "30 seconds") // Join with event-time constraints val result = impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 10 seconds """), joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter" ) val query = result.writeStream.outputMode("update").format("console").option("truncate", false).start() query.awaitTermination()
Result:
------------------------------------------- Batch: 19 ------------------------------------------- --------------------------------------------------------------+ |impressionAdId|impressionTime |clickAdId|clickTime | -------------------------------------------------------------+ |100 |2018-05-23 22:18:38.362|100 |2018-05-23 22:18:41.362| |101 |2018-05-23 22:18:38.562|101 |2018-05-23 22:18:41.562| |102 |2018-05-23 22:18:38.762|102 |2018-05-23 22:18:41.762| |103 |2018-05-23 22:18:38.962|103 |2018-05-23 22:18:41.962| |104 |2018-05-23 22:18:39.162|104 |2018-05-23 22:18:42.162| -------------------------------------------------------------+ ------------------------------------------- Batch: 57 ------------------------------------------- -------------------------------------------------------------+ |impressionAdId|impressionTime |clickAdId|clickTime | -------------------------------------------------------------+ |290 |2018-05-23 22:19:16.362|290 |2018-05-23 22:19:19.362| |291 |2018-05-23 22:19:16.562|291 |2018-05-23 22:19:19.562| |292 |2018-05-23 22:19:16.762|292 |2018-05-23 22:19:19.762| |293 |2018-05-23 22:19:16.962|293 |2018-05-23 22:19:19.962| |294 |2018-05-23 22:19:17.162|294 |2018-05-23 22:19:20.162| |100 |2018-05-23 22:18:38.362|null |null | |99 |2018-05-23 22:18:38.162|null |null | |103 |2018-05-23 22:18:38.962|null |null | |101 |2018-05-23 22:18:38.562|null |null | |102 |2018-05-23 22:18:38.762|null |null | --------------------------------------------------------------+
This question is also asked in the stackoverflow. Please find the link below
101 & 103 have already come in the join but still it is coming in the outer left join.
Attachments
Issue Links
- is duplicated by
-
SPARK-26154 Stream-stream joins - left outer join gives inconsistent output
- Resolved