Details
-
Bug
-
Status: Resolved
-
Trivial
-
Resolution: Fixed
-
2.1.1, 2.2.3, 2.3.4, 2.4.4
-
None
Description
I write code according to this by java and scala.
java
public static void main(String[] args) throws StreamingQueryException { SparkSession spark = SparkSession.builder().appName("test").master("local[*]") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); Dataset<Row> lines = spark.readStream().format("socket") .option("host", "skynet") .option("includeTimestamp", true) .option("port", 8888).load(); Dataset<Row> words = lines.select("timestamp", "value"); Dataset<Row> count = words.withWatermark("timestamp", "10 seconds") .groupBy(functions.window(words.col("timestamp"), "10 seconds", "10 seconds") , words.col("value")).count(); StreamingQuery start = count.writeStream() .outputMode("update") .format("console").start(); start.awaitTermination(); }
scala
def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test"). master("local[*]"). config("spark.sql.shuffle.partitions", 1) .getOrCreate import spark.implicits._ val lines = spark.readStream.format("socket"). option("host", "skynet").option("includeTimestamp", true). option("port", 8888).load val words = lines.select("timestamp", "value") val count = words.withWatermark("timestamp", "10 seconds"). groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value") .count() val start = count.writeStream.outputMode("update").format("console").start start.awaitTermination() }
This is according to official documents. written in Java I found metrics "stateOnCurrentVersionSizeBytes" always increase .but scala is ok.
java
== Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001 +- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)], output=[window#11, value#0, count#10L]) +- StateStoreSave [window#11, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state, runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions = 1], Update, 1579274372624, 2 +- *(3) HashAggregate(keys=[window#11, value#0], functions=[merge_count(1)], output=[window#11, value#0, count#21L]) +- StateStoreRestore [window#11, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state, runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#11, value#0], functions=[merge_count(1)], output=[window#11, value#0, count#21L]) +- Exchange hashpartitioning(window#11, value#0, 1) +- *(1) HashAggregate(keys=[window#11, value#0], functions=[partial_count(1)], output=[window#11, value#0, count#21L]) +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#11, value#0] +- *(1) Filter isnotnull(timestamp#1) +- EventTimeWatermark timestamp#1: timestamp, interval 10 seconds +- LocalTableScan <empty>, [timestamp#1, value#0]
scala
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c +- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)], output=[window#6-T10000ms, value#0, count#10L]) +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state, runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions = 1], Update, 1579275214256, 2 +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L]) +- StateStoreRestore [window#11-T10000ms, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state, runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L]) +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1) +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L]) +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#11-T10000ms, value#0] +- *(1) Filter isnotnull(timestamp#1-T10000ms) +- EventTimeWatermark timestamp#1: timestamp, interval 10 seconds +- LocalTableScan <empty>, [timestamp#1, value#0]
you also can debug in statefulOperators.scala
protected def removeKeysOlderThanWatermark( storeManager: StreamingAggregationStateManager, store: StateStore): Unit = { if (watermarkPredicateForKeys.nonEmpty) { storeManager.keys(store).foreach { keyRow => if (watermarkPredicateForKeys.get.eval(keyRow)) { storeManager.remove(store, keyRow) //this line } } } } }
you will find java does not remove old state.
I think java should write like this
SparkSession spark = SparkSession.builder().appName("test").master("local[*]") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); Dataset<Row> lines = spark.readStream().format("socket") .option("host", "skynet") .option("includeTimestamp",true) .option("port", 8888).load(); Dataset<Row> words = lines.select("timestamp", "value"); Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10 seconds"); Dataset<Row> count = wordsWatermark .groupBy(functions.window(wordsWatermark.col("timestamp"), "10 seconds", "10 seconds") , wordsWatermark.col("value")).count(); StreamingQuery start = count.writeStream() .outputMode("update") .format("console").start(); start.awaitTermination(); }
Attachments
Issue Links
- is caused by
-
SPARK-18669 Update Apache docs regard watermarking in Structured Streaming
- Resolved
- links to