Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.0
-
None
-
Spark 2.3.0 using Spark Shell on Ubuntu 17.4
(Environment is not important, the issue lies in the rate calculation)
Description
When using the rate source in Structured streaming, the `rampUpTime` feature fails to gradually increase the stream rate when the `rampUpTime` option is equal or greater than `rowsPerSecond`.
When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`.
The following scenario, executed in the `spark-shell` demonstrates this issue:
// Using rampUpTime(10) > rowsPerSecond(5)
val stream = spark.readStream .format("rate") .option("rowsPerSecond", 5) .option("rampUpTime", 10) .load() val query = stream.writeStream.format("console").start() // Exiting paste mode, now interpreting. stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 ------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 4 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 5 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 6 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 7 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 8 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 9 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 10 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 11 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 17:08:...| 0| |2018-04-22 17:08:...| 1| |2018-04-22 17:08:...| 2| |2018-04-22 17:08:...| 3| |2018-04-22 17:08:...| 4| +--------------------+-----+ ------------------------------------------- Batch: 12 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 17:08:...| 5| |2018-04-22 17:08:...| 6| |2018-04-22 17:08:...| 7| |2018-04-22 17:08:...| 8| |2018-04-22 17:08:...| 9| +--------------------+-----+
This scenario shows rowsPerSecond == rampUpTime, which also fails
val stream = spark.readStream .format("rate") .option("rowsPerSecond", 10) .option("rampUpTime", 10) .load() val query = stream.writeStream.format("console").start() // Exiting paste mode, now interpreting. stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a scala> ------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 4 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 5 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 6 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 7 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 8 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 9 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 10 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 11 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:32:...| 0| |2018-04-22 15:32:...| 1| |2018-04-22 15:32:...| 2| |2018-04-22 15:32:...| 3| |2018-04-22 15:32:...| 4| |2018-04-22 15:32:...| 5| |2018-04-22 15:32:...| 6| |2018-04-22 15:32:...| 7| |2018-04-22 15:32:...| 8| |2018-04-22 15:32:...| 9| +--------------------+-----+ ------------------------------------------- Batch: 12 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:32:...| 10| |2018-04-22 15:32:...| 11| |2018-04-22 15:32:...| 12| |2018-04-22 15:32:...| 13| |2018-04-22 15:32:...| 14| |2018-04-22 15:32:...| 15| |2018-04-22 15:32:...| 16| |2018-04-22 15:32:...| 17| |2018-04-22 15:32:...| 18| |2018-04-22 15:32:...| 19| +--------------------+-----+
In contrast, when `rowsPerSecond > rampUpTime` the gradual increase happens as expected.
.option("rowsPerSecond", 11) .option("rampUpTime", 10)
val stream = spark.readStream .format("rate") .option("rowsPerSecond", 11) .option("rampUpTime", 10) .load() val query = stream.writeStream.format("console").start() // Exiting paste mode, now interpreting. stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19c6e821 scala> ------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 0| +--------------------+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 1| |2018-04-22 15:34:...| 2| +--------------------+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 3| |2018-04-22 15:34:...| 4| |2018-04-22 15:34:...| 5| +--------------------+-----+ ------------------------------------------- Batch: 4 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 6| |2018-04-22 15:34:...| 7| |2018-04-22 15:34:...| 8| |2018-04-22 15:34:...| 9| +--------------------+-----+ ------------------------------------------- Batch: 5 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 10| |2018-04-22 15:34:...| 11| |2018-04-22 15:34:...| 12| |2018-04-22 15:34:...| 13| |2018-04-22 15:34:...| 14| +--------------------+-----+ ------------------------------------------- Batch: 6 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 15| |2018-04-22 15:34:...| 16| |2018-04-22 15:34:...| 17| |2018-04-22 15:34:...| 18| |2018-04-22 15:34:...| 19| |2018-04-22 15:34:...| 20| +--------------------+-----+ ------------------------------------------- Batch: 7 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 21| |2018-04-22 15:34:...| 22| |2018-04-22 15:34:...| 23| |2018-04-22 15:34:...| 24| |2018-04-22 15:34:...| 25| |2018-04-22 15:34:...| 26| |2018-04-22 15:34:...| 27| +--------------------+-----+ ------------------------------------------- Batch: 8 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 28| |2018-04-22 15:34:...| 29| |2018-04-22 15:34:...| 30| |2018-04-22 15:34:...| 31| |2018-04-22 15:34:...| 32| |2018-04-22 15:34:...| 33| |2018-04-22 15:34:...| 34| |2018-04-22 15:34:...| 35| +--------------------+-----+ ------------------------------------------- Batch: 9 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 36| |2018-04-22 15:34:...| 37| |2018-04-22 15:34:...| 38| |2018-04-22 15:34:...| 39| |2018-04-22 15:34:...| 40| |2018-04-22 15:34:...| 41| |2018-04-22 15:34:...| 42| |2018-04-22 15:34:...| 43| |2018-04-22 15:34:...| 44| +--------------------+-----+ ------------------------------------------- Batch: 10 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 45| |2018-04-22 15:34:...| 46| |2018-04-22 15:34:...| 47| |2018-04-22 15:34:...| 48| |2018-04-22 15:34:...| 49| |2018-04-22 15:34:...| 50| |2018-04-22 15:34:...| 51| |2018-04-22 15:34:...| 52| |2018-04-22 15:34:...| 53| |2018-04-22 15:34:...| 54| +--------------------+-----+ ------------------------------------------- Batch: 11 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 55| |2018-04-22 15:34:...| 56| |2018-04-22 15:34:...| 57| |2018-04-22 15:34:...| 58| |2018-04-22 15:34:...| 59| |2018-04-22 15:34:...| 60| |2018-04-22 15:34:...| 61| |2018-04-22 15:34:...| 62| |2018-04-22 15:34:...| 63| |2018-04-22 15:34:...| 64| |2018-04-22 15:34:...| 65| +--------------------+-----+ ------------------------------------------- Batch: 12 ------------------------------------------- +--------------------+-----+ | timestamp|value| +--------------------+-----+ |2018-04-22 15:34:...| 66| |2018-04-22 15:34:...| 67| |2018-04-22 15:34:...| 68| |2018-04-22 15:34:...| 69| |2018-04-22 15:34:...| 70| |2018-04-22 15:34:...| 71| |2018-04-22 15:34:...| 72| |2018-04-22 15:34:...| 73| |2018-04-22 15:34:...| 74| |2018-04-22 15:34:...| 75| |2018-04-22 15:34:...| 76| +--------------------+-----+