Affects Version/s: 2.4.3
Fix Version/s: None
Spark 2.4.3 (scala 2.11.12)
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
OS: Ubuntu 18.04 LTS
I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table and faced strange behaviour:
1. I've noticed that logs inside implementation of MapGroupsWithStateFunction/ FlatMapGroupsWithStateFunction in my application outputted twice.
2. While finding a root cause I've also found that number State rows reported by Spark is also doubles.
I thought that may be there's a bug in my code, so I back to JavaStructuredSessionization from Apache Spark examples and changed it a bit. Still got same result.
The problem happens only if I do not perform datch.DF.persist inside foreachBatch.
According to https://docs.databricks.com/_static/notebooks/merge-in-streaming.html and Apache spark docs there's seems to be no need to persist dataset/dataframe inside foreachBatch.
Sample code from Apache Spark examples with delta: JavaStructuredSessionization with Delta merge
Appreciate your clarification.