Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37987

Flaky Test: StreamingAggregationSuite.changing schema of state when restarting query - state format version 1

    XMLWordPrintableJSON

Details

    Description

      StreamingAggregationSuite.changing schema of state when restarting query - state format version 1
      
      org.scalatest.exceptions.TestFailedException: 
      Error while checking stream failure: stateSchemaExc.isDefined was false
      org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
      	org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
      	org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
      	org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
      	org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82(StreamingAggregationSuite.scala:781)
      	org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82$adapted(StreamingAggregationSuite.scala:779)
      	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$33(StreamTest.scala:644)
      	scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
      	org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
      	org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
      
      
      == Progress ==
         StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c245252,Map(),/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2)
         AddData to MemoryStream[value#12734]: 21
      => ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
      
      == Stream ==
      Output Mode: Update
      Stream state: {MemoryStream[value#12734]: 0}
      Thread state: dead
      
      
      
      == Sink ==
      
      
      
      == Plan ==
      == Parsed Logical Plan ==
      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
      +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS values#12746]
         +- Project [(value#12734 % 10) AS id#12736, value#12734]
            +- StreamingDataSourceV2Relation [value#12734], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, MemoryStream[value#12734], 0, 1
      
      == Analyzed Logical Plan ==
      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
      +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS values#12746]
         +- Project [(value#12734 % 10) AS id#12736, value#12734]
            +- StreamingDataSourceV2Relation [value#12734], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, MemoryStream[value#12734], 0, 1
      
      == Optimized Logical Plan ==
      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
      +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS values#12746]
         +- Project [(value#12734 % 10) AS id#12736, value#12734]
            +- StreamingDataSourceV2Relation [value#12734], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, MemoryStream[value#12734], 0, 1
      
      == Physical Plan ==
      WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2001/1955131699@11177c0f
      +- ObjectHashAggregate(keys=[id#12736], functions=[sum(value#12734), avg(value#12734), collect_list(value#12734, 0, 0)], output=[id#12736, sum_value#12742L, avg_value#12744, values#12746])
         +- StateStoreSave [id#12736], state info [ checkpoint = file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state, runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, numPartitions = 5], Update, 0, 1
            +- ObjectHashAggregate(keys=[id#12736], functions=[merge_sum(value#12734), merge_avg(value#12734), merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, sum#12759, count#12760L, buf#12762])
               +- StateStoreRestore [id#12736], state info [ checkpoint = file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state, runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, numPartitions = 5], 1
                  +- Exchange hashpartitioning(id#12736, 5), ENSURE_REQUIREMENTS, [id=#50108]
                     +- ObjectHashAggregate(keys=[id#12736], functions=[merge_sum(value#12734), merge_avg(value#12734), merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, sum#12759, count#12760L, buf#12762])
                        +- ObjectHashAggregate(keys=[id#12736], functions=[partial_sum(value#12734), partial_avg(value#12734), partial_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, sum#12759, count#12760L, buf#12762])
                           +- *(1) Project [(value#12734 % 10) AS id#12736, value#12734]
                              +- MicroBatchScan[value#12734] MemoryStreamDataSource
      

      Attachments

        Activity

          People

            kabhwan Jungtaek Lim
            gurwls223 Hyukjin Kwon
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: