Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30634

Delta Merge and Arbitrary Stateful Processing in Structured streaming (foreachBatch)

    XMLWordPrintableJSON

Details

    • Question
    • Status: Closed
    • Trivial
    • Resolution: Invalid
    • 2.4.3
    • None
    • None
    • Spark 2.4.3 (scala 2.11.12)

      Delta: 0.5.0

      Java(TM) SE Runtime Environment (build 1.8.0_91-b14)

      OS: Ubuntu 18.04 LTS

       

    Description

      Hi ,

      I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table and faced strange behaviour:

      1. I've noticed that logs inside implementation of MapGroupsWithStateFunction/ FlatMapGroupsWithStateFunction in my application outputted twice.

      2. While finding a root cause I've also found that number State rows reported by Spark is also doubles.

       

      I thought that may be there's a bug in my code, so I back to JavaStructuredSessionization from Apache Spark examples and changed it a bit. Still got same result.

      The problem happens only if I do not perform datch.DF.persist inside foreachBatch.

      StreamingQuery query = sessionUpdates
              .writeStream()
              .outputMode("update")
              .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, v2) -> {
                  // following doubles number of spark state rows and causes MapGroupsWithStateFunction to log twice withport persisting
                  deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), mergeExpr)
                          .whenNotMatched().insertAll()
                          .whenMatched()
                          .updateAll()
                          .execute();
              })
              .trigger(Trigger.ProcessingTime(10000))
              .queryName("ACME")
              .start(); 
      

      According to https://docs.databricks.com/_static/notebooks/merge-in-streaming.html and Apache spark docs there's seems to be no need to persist dataset/dataframe inside foreachBatch.

      Sample code from Apache Spark examples with delta: JavaStructuredSessionization with Delta merge

       

       

      Appreciate your clarification.

       

      Attachments

        1. Capture1.PNG
          42 kB
          Yurii Oleynikov

        Activity

          People

            Unassigned Unassigned
            yurkao Yurii Oleynikov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: