Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30634

Delta Merge and Arbitrary Stateful Processing in Structured streaming (foreachBatch)

    XMLWordPrintableJSON

    Details

    • Type: Question
    • Status: Closed
    • Priority: Trivial
    • Resolution: Invalid
    • Affects Version/s: 2.4.3
    • Fix Version/s: None
    • Labels:
      None
    • Environment:

      Spark 2.4.3 (scala 2.11.12)

      Delta: 0.5.0

      Java(TM) SE Runtime Environment (build 1.8.0_91-b14)

      OS: Ubuntu 18.04 LTS

       

      Description

      Hi ,

      I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table and faced strange behaviour:

      1. I've noticed that logs inside implementation of MapGroupsWithStateFunction/ FlatMapGroupsWithStateFunction in my application outputted twice.

      2. While finding a root cause I've also found that number State rows reported by Spark is also doubles.

       

      I thought that may be there's a bug in my code, so I back to JavaStructuredSessionization from Apache Spark examples and changed it a bit. Still got same result.

      The problem happens only if I do not perform datch.DF.persist inside foreachBatch.

      StreamingQuery query = sessionUpdates
              .writeStream()
              .outputMode("update")
              .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, v2) -> {
                  // following doubles number of spark state rows and causes MapGroupsWithStateFunction to log twice withport persisting
                  deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), mergeExpr)
                          .whenNotMatched().insertAll()
                          .whenMatched()
                          .updateAll()
                          .execute();
              })
              .trigger(Trigger.ProcessingTime(10000))
              .queryName("ACME")
              .start(); 
      

      According to https://docs.databricks.com/_static/notebooks/merge-in-streaming.html and Apache spark docs there's seems to be no need to persist dataset/dataframe inside foreachBatch.

      Sample code from Apache Spark examples with delta: JavaStructuredSessionization with Delta merge

       

       

      Appreciate your clarification.

       

        Attachments

        1. Capture1.PNG
          42 kB
          Yurii Oleynikov

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              yurkao Yurii Oleynikov
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: