Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25834

stream stream Outer join with update mode is not throwing exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 2.3.1, 2.3.2
    • None
    • Structured Streaming
    • None

    Description

      Execute the below program and can see there is no AnalysisException thrown

      import java.sql.Timestamp
      import org.apache.spark.sql.functions.{col, expr}
      import org.apache.spark.sql.streaming.Trigger

      val lines_stream1 = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
      option("subscribe", "test11").
      option("includeTimestamp", true).
      load().
      selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
      select(col("value") as("data"),col("timestamp") as("recordTime")).
      select("data","recordTime").
      withWatermark("recordTime", "20 seconds ")

      val lines_stream2 = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
      option("subscribe", "test22").
      option("includeTimestamp", value = true).
      load().
      selectExpr("CAST (value AS String)","CAST(timestamp AS TIMESTAMP)").as[(String,Timestamp)].
      select(col("value") as("data1"),col("timestamp") as("recordTime1")).
      select("data1","recordTime1").
      withWatermark("recordTime1", "20 seconds ")

      val query = lines_stream1.join(lines_stream2, expr (
      """

      data == data1 and
      recordTime1 >= recordTime and
      recordTime1 <= recordTime + interval 20 seconds
      """.stripMargin),"left").
      writeStream.
      option("truncate","false").
      outputMode("update").
      format("console").
      trigger(Trigger.ProcessingTime ("2 second")).
      start()

      query.awaitTermination()

      As per the document https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins
      joins are only supported in append mode

      As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

      Inner join is working as per spark documentation but it is failed for outer joins

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sachin1729 Sachin Ramachandra Setty
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: