Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30553

Fix structured-streaming java example error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Trivial
    • Resolution: Fixed
    • 2.1.1, 2.2.3, 2.3.4, 2.4.4
    • 2.4.5, 3.0.0
    • Documentation
    • None

    Description

      http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

      I write code according to this by java and scala.

      java

          public static void main(String[] args) throws StreamingQueryException {
              SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
                      .config("spark.sql.shuffle.partitions", 1)
                      .getOrCreate();        Dataset<Row> lines = spark.readStream().format("socket")
                      .option("host", "skynet")
                      .option("includeTimestamp", true)
                      .option("port", 8888).load();
              Dataset<Row> words = lines.select("timestamp", "value");
              Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
                      .groupBy(functions.window(words.col("timestamp"), "10 seconds", "10 seconds")
                              , words.col("value")).count();
              StreamingQuery start = count.writeStream()
                      .outputMode("update")
                      .format("console").start();
              start.awaitTermination();    }
      

      scala

       

       def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder.appName("test").
            master("local[*]").
            config("spark.sql.shuffle.partitions", 1)
            .getOrCreate
          import spark.implicits._
          val lines = spark.readStream.format("socket").
            option("host", "skynet").option("includeTimestamp", true).
            option("port", 8888).load
          val words = lines.select("timestamp", "value")
          val count = words.withWatermark("timestamp", "10 seconds").
            groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
            .count()
          val start = count.writeStream.outputMode("update").format("console").start
          start.awaitTermination()
        }
      

      This is according to official documents. written in Java I found metrics "stateOnCurrentVersionSizeBytes" always increase .but scala is ok.

       

      java

       

      == Physical Plan ==
      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
      +- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)], output=[window#11, value#0, count#10L])
         +- StateStoreSave [window#11, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state, runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions = 1], Update, 1579274372624, 2
            +- *(3) HashAggregate(keys=[window#11, value#0], functions=[merge_count(1)], output=[window#11, value#0, count#21L])
               +- StateStoreRestore [window#11, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state, runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions = 1], 2
                  +- *(2) HashAggregate(keys=[window#11, value#0], functions=[merge_count(1)], output=[window#11, value#0, count#21L])
                     +- Exchange hashpartitioning(window#11, value#0, 1)
                        +- *(1) HashAggregate(keys=[window#11, value#0], functions=[partial_count(1)], output=[window#11, value#0, count#21L])
                           +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#11, value#0]
                              +- *(1) Filter isnotnull(timestamp#1)
                                 +- EventTimeWatermark timestamp#1: timestamp, interval 10 seconds
                                    +- LocalTableScan <empty>, [timestamp#1, value#0]
      
      

       

       

      scala 

       

       

      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
      +- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)], output=[window#6-T10000ms, value#0, count#10L])
         +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state, runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions = 1], Update, 1579275214256, 2
            +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
               +- StateStoreRestore [window#11-T10000ms, value#0], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state, runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions = 1], 2
                  +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
                     +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
                        +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
                           +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#11-T10000ms, value#0]
                              +- *(1) Filter isnotnull(timestamp#1-T10000ms)
                                 +- EventTimeWatermark timestamp#1: timestamp, interval 10 seconds
                                    +- LocalTableScan <empty>, [timestamp#1, value#0]
      

       

       you also can debug in statefulOperators.scala  

        protected def removeKeysOlderThanWatermark(
            storeManager: StreamingAggregationStateManager,
            store: StateStore): Unit = {
          if (watermarkPredicateForKeys.nonEmpty) {
            storeManager.keys(store).foreach { keyRow =>
              if (watermarkPredicateForKeys.get.eval(keyRow)) {
                storeManager.remove(store, keyRow)  //this line
              }
            }
          }
        }
      }
      
      

      you will find java does not remove old state.

       I think java should write like this

              SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
                      .config("spark.sql.shuffle.partitions", 1)
                      .getOrCreate();        Dataset<Row> lines = spark.readStream().format("socket")
                      .option("host", "skynet")
                      .option("includeTimestamp",true)
                      .option("port", 8888).load();
              Dataset<Row> words = lines.select("timestamp", "value");
              Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10 seconds");
              Dataset<Row> count = wordsWatermark
                      .groupBy(functions.window(wordsWatermark.col("timestamp"), "10 seconds", "10 seconds")
                              , wordsWatermark.col("value")).count();
              StreamingQuery start = count.writeStream()
                      .outputMode("update")
                      .format("console").start();
              start.awaitTermination();    }
      

      Attachments

        Issue Links

          Activity

            People

              bettermouse bettermouse
              bettermouse bettermouse
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: