Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30393

Too much ProvisionedThroughputExceededException while recover from checkpoint

    XMLWordPrintableJSON

Details

    • Question
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.4.3
    • None
    • DStreams
    • None
    • I am using EMR 5.25.0, Spark 2.4.3, spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty of memory. 6 kinesis shards, I even increased to 12 shards but still see the kinesis error

    Description

      I have a spark application which consume from Kinesis with 6 shards. Data was produced to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is enough to handle that.

      I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is one hour long.

      Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time so the data incoming rate is very low like 200 records/sec. I run the Spark application by creating new context, checkpoint is created at s3, but when I kill the app and restarts, it failed to recover from checkpoint, and the error message is the following and my SparkUI shows all the batches are stucked, and it takes a long time for the checkpoint recovery to complete, 15 minutes to over an hour.

      I found lots of error message in the log related to Kinesis exceeding read limit:

      19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last exception:

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)

      at scala.Option.getOrElse(Option.scala:121)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)

      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)

      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

      at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)

      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

      at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)

      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

      at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)

      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)

      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)

      at org.apache.spark.scheduler.Task.run(Task.scala:121)

      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)

      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)

      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)

      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

      at java.lang.Thread.run(Thread.java:748)

      Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)

      at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)

      at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)

      at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)

      at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)

      at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)

      at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)

      at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)

      at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)

      at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)

      ... 20 more

      I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970, not sure whether there is any fix for that.

      Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms (1 second) so that I have less number of partitions. But the problem still exists.

      I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still the problem exists. I also read that enable WAL is no longer necessary from beyond spark version 2.

      Could this be related to my hour long sliding window I kept in memory? 3600 seconds X 200records/second = 720K record, if the recovery process try to load all of them into memory from kinesis, it will exceed my limit of 12 shards*2000record/sec/shard = 24K records/second? If so, wouldn't this be a flaw as I don't need and can't afford 360 (peak time 3600) shards for this app just for checkpointing purpose.

      I understand checkpoint recovery might be a lengthy process, but how do I eliminate the " ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow checkpoint recovery.

      In the attached screenshot "kinesisexceedreadlimit.png", one can see the sharp increase of Get Record Count to nearly 3.8 million records in 5 minutes interval during which the checkpoint recovery is happening. And Get Record Success dropped to around 0.5.

      Thanks, can someone please help?

      Attachments

        1. sparkuiwhilecheckpointrecoveryerror.png
          285 kB
          Stephen
        2. kinesisusagewhilecheckpointrecoveryerror.png
          271 kB
          Stephen
        3. kinesisexceedreadlimit.png
          204 kB
          Stephen

        Activity

          People

            Unassigned Unassigned
            Spearsberg Stephen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: