Description
Currently the `spark.streaming.kafka.maxRatePerPartition` is used as the initial rate when the backpressure is enabled. This is too exhaustive for the application while it still warms up.
This is similar to SPARK-11627, applying the solution provided there to DirectKafkaInputDStream.