Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Duplicate
-
2.3.1, 2.3.2
-
None
-
None
Description
Execute the below program and can see there is no AnalysisException thrown
import java.sql.Timestamp
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.streaming.Trigger
val lines_stream1 = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
option("subscribe", "test11").
option("includeTimestamp", true).
load().
selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
select(col("value") as("data"),col("timestamp") as("recordTime")).
select("data","recordTime").
withWatermark("recordTime", "20 seconds ")
val lines_stream2 = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
option("subscribe", "test22").
option("includeTimestamp", value = true).
load().
selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
select(col("value") as("data1"),col("timestamp") as("recordTime1")).
select("data1","recordTime1").
withWatermark("recordTime1", "20 seconds ")
val query = lines_stream1.join(lines_stream2, expr (
"""
data == data1 and |
recordTime1 >= recordTime and |
recordTime1 <= recordTime + interval 20 seconds """.stripMargin),"left"). writeStream. option("truncate","false"). outputMode("update"). format("console"). trigger(Trigger.ProcessingTime ("2 second")). start() |
query.awaitTermination()
As per the document https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins
joins are only supported in append mode
As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
Inner join is working as per spark documentation but it is failed for outer joins
Attachments
Issue Links
- duplicates
-
SPARK-28223 stream-stream joins should fail unsupported checker in update mode
- Resolved
- links to