Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24046

Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • Structured Streaming
    • Spark 2.3.0 using Spark Shell on Ubuntu 17.4

      (Environment is not important, the issue lies in the rate calculation)


      When using the rate source in Structured streaming, the `rampUpTime` feature fails to gradually increase the stream rate when the `rampUpTime` option is equal or greater than `rowsPerSecond`. 

      When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.

      The following scenario, executed in the `spark-shell` demonstrates this issue:

      // Using rampUpTime(10) > rowsPerSecond(5)  
      val stream = spark.readStream
      .option("rowsPerSecond", 5)
      .option("rampUpTime", 10)
      val query = stream.writeStream.format("console").start()
      // Exiting paste mode, now interpreting.
      stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
      query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
      Batch: 0
      Batch: 1
      Batch: 2
      Batch: 3
      Batch: 4
      Batch: 5
      Batch: 6
      Batch: 7
      Batch: 8
      Batch: 9
      Batch: 10
      Batch: 11
      | timestamp|value|
      |2018-04-22 17:08:...| 0|
      |2018-04-22 17:08:...| 1|
      |2018-04-22 17:08:...| 2|
      |2018-04-22 17:08:...| 3|
      |2018-04-22 17:08:...| 4|
      Batch: 12
      | timestamp|value|
      |2018-04-22 17:08:...| 5|
      |2018-04-22 17:08:...| 6|
      |2018-04-22 17:08:...| 7|
      |2018-04-22 17:08:...| 8|
      |2018-04-22 17:08:...| 9|


      This scenario shows rowsPerSecond == rampUpTime,  which also fails

      val stream = spark.readStream
      .option("rowsPerSecond", 10)
      .option("rampUpTime", 10)
      val query = stream.writeStream.format("console").start()
      // Exiting paste mode, now interpreting.
      stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
      query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a
      scala> -------------------------------------------
      Batch: 0
      Batch: 1
      Batch: 2
      Batch: 3
      Batch: 4
      Batch: 5
      Batch: 6
      Batch: 7
      Batch: 8
      Batch: 9
      Batch: 10
      Batch: 11
      | timestamp|value|
      |2018-04-22 15:32:...| 0|
      |2018-04-22 15:32:...| 1|
      |2018-04-22 15:32:...| 2|
      |2018-04-22 15:32:...| 3|
      |2018-04-22 15:32:...| 4|
      |2018-04-22 15:32:...| 5|
      |2018-04-22 15:32:...| 6|
      |2018-04-22 15:32:...| 7|
      |2018-04-22 15:32:...| 8|
      |2018-04-22 15:32:...| 9|
      Batch: 12
      | timestamp|value|
      |2018-04-22 15:32:...| 10|
      |2018-04-22 15:32:...| 11|
      |2018-04-22 15:32:...| 12|
      |2018-04-22 15:32:...| 13|
      |2018-04-22 15:32:...| 14|
      |2018-04-22 15:32:...| 15|
      |2018-04-22 15:32:...| 16|
      |2018-04-22 15:32:...| 17|
      |2018-04-22 15:32:...| 18|
      |2018-04-22 15:32:...| 19|


      In contrast, when `rowsPerSecond > rampUpTime` the gradual increase happens as expected.


      .option("rowsPerSecond", 11)
      .option("rampUpTime", 10)


      val stream = spark.readStream
      .option("rowsPerSecond", 11)
      .option("rampUpTime", 10)
      val query = stream.writeStream.format("console").start()
      // Exiting paste mode, now interpreting.
      stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
      query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19c6e821
      scala> -------------------------------------------
      Batch: 0
      Batch: 1
      | timestamp|value|
      |2018-04-22 15:34:...| 0|
      Batch: 2
      | timestamp|value|
      |2018-04-22 15:34:...| 1|
      |2018-04-22 15:34:...| 2|
      Batch: 3
      | timestamp|value|
      |2018-04-22 15:34:...| 3|
      |2018-04-22 15:34:...| 4|
      |2018-04-22 15:34:...| 5|
      Batch: 4
      | timestamp|value|
      |2018-04-22 15:34:...| 6|
      |2018-04-22 15:34:...| 7|
      |2018-04-22 15:34:...| 8|
      |2018-04-22 15:34:...| 9|
      Batch: 5
      | timestamp|value|
      |2018-04-22 15:34:...| 10|
      |2018-04-22 15:34:...| 11|
      |2018-04-22 15:34:...| 12|
      |2018-04-22 15:34:...| 13|
      |2018-04-22 15:34:...| 14|
      Batch: 6
      | timestamp|value|
      |2018-04-22 15:34:...| 15|
      |2018-04-22 15:34:...| 16|
      |2018-04-22 15:34:...| 17|
      |2018-04-22 15:34:...| 18|
      |2018-04-22 15:34:...| 19|
      |2018-04-22 15:34:...| 20|
      Batch: 7
      | timestamp|value|
      |2018-04-22 15:34:...| 21|
      |2018-04-22 15:34:...| 22|
      |2018-04-22 15:34:...| 23|
      |2018-04-22 15:34:...| 24|
      |2018-04-22 15:34:...| 25|
      |2018-04-22 15:34:...| 26|
      |2018-04-22 15:34:...| 27|
      Batch: 8
      | timestamp|value|
      |2018-04-22 15:34:...| 28|
      |2018-04-22 15:34:...| 29|
      |2018-04-22 15:34:...| 30|
      |2018-04-22 15:34:...| 31|
      |2018-04-22 15:34:...| 32|
      |2018-04-22 15:34:...| 33|
      |2018-04-22 15:34:...| 34|
      |2018-04-22 15:34:...| 35|
      Batch: 9
      | timestamp|value|
      |2018-04-22 15:34:...| 36|
      |2018-04-22 15:34:...| 37|
      |2018-04-22 15:34:...| 38|
      |2018-04-22 15:34:...| 39|
      |2018-04-22 15:34:...| 40|
      |2018-04-22 15:34:...| 41|
      |2018-04-22 15:34:...| 42|
      |2018-04-22 15:34:...| 43|
      |2018-04-22 15:34:...| 44|
      Batch: 10
      | timestamp|value|
      |2018-04-22 15:34:...| 45|
      |2018-04-22 15:34:...| 46|
      |2018-04-22 15:34:...| 47|
      |2018-04-22 15:34:...| 48|
      |2018-04-22 15:34:...| 49|
      |2018-04-22 15:34:...| 50|
      |2018-04-22 15:34:...| 51|
      |2018-04-22 15:34:...| 52|
      |2018-04-22 15:34:...| 53|
      |2018-04-22 15:34:...| 54|
      Batch: 11
      | timestamp|value|
      |2018-04-22 15:34:...| 55|
      |2018-04-22 15:34:...| 56|
      |2018-04-22 15:34:...| 57|
      |2018-04-22 15:34:...| 58|
      |2018-04-22 15:34:...| 59|
      |2018-04-22 15:34:...| 60|
      |2018-04-22 15:34:...| 61|
      |2018-04-22 15:34:...| 62|
      |2018-04-22 15:34:...| 63|
      |2018-04-22 15:34:...| 64|
      |2018-04-22 15:34:...| 65|
      Batch: 12
      | timestamp|value|
      |2018-04-22 15:34:...| 66|
      |2018-04-22 15:34:...| 67|
      |2018-04-22 15:34:...| 68|
      |2018-04-22 15:34:...| 69|
      |2018-04-22 15:34:...| 70|
      |2018-04-22 15:34:...| 71|
      |2018-04-22 15:34:...| 72|
      |2018-04-22 15:34:...| 73|
      |2018-04-22 15:34:...| 74|
      |2018-04-22 15:34:...| 75|
      |2018-04-22 15:34:...| 76|



        1. image-2018-04-22-22-03-03-945.png
          56 kB
          Gerard Maas
        2. image-2018-04-22-22-06-49-202.png
          85 kB
          Gerard Maas



            Unassigned Unassigned
            gmaas Gerard Maas
            Tathagata Das Tathagata Das
            1 Vote for this issue
            4 Start watching this issue

