Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18020

Kinesis receiver does not snapshot when shard completes

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.2.0
    • Component/s: DStreams
    • Labels:

      Description

      When a kinesis shard is split or combined and the old shard ends, the Amazon Kinesis Client library calls IRecordProcessor.shutdown and expects that IRecordProcessor.shutdown must checkpoint the sequence number ExtendedSequenceNumber.SHARD_END before returning. Unfortunately, spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This results in an error message, and spark is then blocked indefinitely from processing any items from the child shards.

      This issue has also been raised on StackOverflow: resharding while spark running on kinesis stream

      Exception that is logged:

      16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
      java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000030
              at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
              at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
              at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      Command used to split shard:

      aws kinesis --region us-west-1 split-shard --stream-name my-stream --shard-to-split shardId-000000000030 --new-starting-hash-key 5316911983139663491615228241121378303
      

      After the spark-streaming job has hung, examining the DynamoDB table indicates that the parent shard processor has not reached ExtendedSequenceNumber.SHARD_END and the child shards are still at ExtendedSequenceNumber.TRIM_HORIZON waiting for the parent to finish:

      aws kinesis --region us-west-1 describe-stream --stream-name my-stream
      {
          "StreamDescription": {
              "RetentionPeriodHours": 24, 
              "StreamName": "my-stream", 
              "Shards": [
                  {
                      "ShardId": "shardId-000000000030", 
                      "HashKeyRange": {
                          "EndingHashKey": "10633823966279326983230456482242756606", 
                          "StartingHashKey": "0"
                      },
                      ...
                  }, 
                  {
                      "ShardId": "shardId-000000000062", 
                      "HashKeyRange": {
                          "EndingHashKey": "5316911983139663491615228241121378302", 
                          "StartingHashKey": "0"
                      }, 
                      "ParentShardId": "shardId-000000000030", 
                      "SequenceNumberRange": {
                          "StartingSequenceNumber": "49566806087883755242230188435465744452396445937434624994"
                      }
                  }, 
                  {
                      "ShardId": "shardId-000000000063", 
                      "HashKeyRange": {
                          "EndingHashKey": "10633823966279326983230456482242756606", 
                          "StartingHashKey": "5316911983139663491615228241121378303"
                      }, 
                      "ParentShardId": "shardId-000000000030", 
                      "SequenceNumberRange": {
                          "StartingSequenceNumber": "49566806087906055987428719058607280170669094298940605426"
                      }
                  },
                  ...
              ],
              "StreamStatus": "ACTIVE"
          }
      }
      
      aws dynamodb --region us-west-1 scan --table-name my-processor
      {
          "Items": [
              {
                  "leaseOwner": {
                      "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
                  }, 
                  "leaseCounter": {
                      "N": "49318"
                  }, 
                  "ownerSwitchesSinceCheckpoint": {
                      "N": "62"
                  }, 
                  "checkpointSubSequenceNumber": {
                      "N": "0"
                  }, 
                  "checkpoint": {
                      "S": "49566573572821264975247582655142547856950135436343247330"
                  }, 
                  "parentShardId": {
                      "SS": [
                          "shardId-000000000014"
                      ]
                  }, 
                  "leaseKey": {
                      "S": "shardId-000000000030"
                  }
              }, 
              {
                  "leaseOwner": {
                      "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
                  }, 
                  "leaseCounter": {
                      "N": "25439"
                  }, 
                  "ownerSwitchesSinceCheckpoint": {
                      "N": "69"
                  }, 
                  "checkpointSubSequenceNumber": {
                      "N": "0"
                  }, 
                  "checkpoint": {
                      "S": "TRIM_HORIZON"
                  }, 
                  "parentShardId": {
                      "SS": [
                          "shardId-000000000030"
                      ]
                  }, 
                  "leaseKey": {
                      "S": "shardId-000000000062"
                  }
              }, 
              {
                  "leaseOwner": {
                      "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83"
                  }, 
                  "leaseCounter": {
                      "N": "25443"
                  }, 
                  "ownerSwitchesSinceCheckpoint": {
                      "N": "59"
                  }, 
                  "checkpointSubSequenceNumber": {
                      "N": "0"
                  }, 
                  "checkpoint": {
                      "S": "TRIM_HORIZON"
                  }, 
                  "parentShardId": {
                      "SS": [
                          "shardId-000000000030"
                      ]
                  }, 
                  "leaseKey": {
                      "S": "shardId-000000000063"
                  }
              },
              ...
          ]
      }
      

      Workaround: I manually edited the DynamoDB table to delete the checkpoints for the parent shards. The child shards were then able to begin processing. I’m not sure whether this resulted in a few items being lost though.

        Attachments

          Activity

            People

            • Assignee:
              maropu Takeshi Yamamuro
              Reporter:
              yonran Yonathan Randolph
            • Votes:
              3 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: