Description
When a kinesis shard is split or combined and the old shard ends, the Amazon Kinesis Client library calls IRecordProcessor.shutdown and expects that IRecordProcessor.shutdown must checkpoint the sequence number ExtendedSequenceNumber.SHARD_END before returning. Unfortunately, spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This results in an error message, and spark is then blocked indefinitely from processing any items from the child shards.
This issue has also been raised on StackOverflow: resharding while spark running on kinesis stream
Exception that is logged:
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000030 at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Command used to split shard:
aws kinesis --region us-west-1 split-shard --stream-name my-stream --shard-to-split shardId-000000000030 --new-starting-hash-key 5316911983139663491615228241121378303
After the spark-streaming job has hung, examining the DynamoDB table indicates that the parent shard processor has not reached ExtendedSequenceNumber.SHARD_END and the child shards are still at ExtendedSequenceNumber.TRIM_HORIZON waiting for the parent to finish:
aws kinesis --region us-west-1 describe-stream --stream-name my-stream { "StreamDescription": { "RetentionPeriodHours": 24, "StreamName": "my-stream", "Shards": [ { "ShardId": "shardId-000000000030", "HashKeyRange": { "EndingHashKey": "10633823966279326983230456482242756606", "StartingHashKey": "0" }, ... }, { "ShardId": "shardId-000000000062", "HashKeyRange": { "EndingHashKey": "5316911983139663491615228241121378302", "StartingHashKey": "0" }, "ParentShardId": "shardId-000000000030", "SequenceNumberRange": { "StartingSequenceNumber": "49566806087883755242230188435465744452396445937434624994" } }, { "ShardId": "shardId-000000000063", "HashKeyRange": { "EndingHashKey": "10633823966279326983230456482242756606", "StartingHashKey": "5316911983139663491615228241121378303" }, "ParentShardId": "shardId-000000000030", "SequenceNumberRange": { "StartingSequenceNumber": "49566806087906055987428719058607280170669094298940605426" } }, ... ], "StreamStatus": "ACTIVE" } } aws dynamodb --region us-west-1 scan --table-name my-processor { "Items": [ { "leaseOwner": { "S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe" }, "leaseCounter": { "N": "49318" }, "ownerSwitchesSinceCheckpoint": { "N": "62" }, "checkpointSubSequenceNumber": { "N": "0" }, "checkpoint": { "S": "49566573572821264975247582655142547856950135436343247330" }, "parentShardId": { "SS": [ "shardId-000000000014" ] }, "leaseKey": { "S": "shardId-000000000030" } }, { "leaseOwner": { "S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02" }, "leaseCounter": { "N": "25439" }, "ownerSwitchesSinceCheckpoint": { "N": "69" }, "checkpointSubSequenceNumber": { "N": "0" }, "checkpoint": { "S": "TRIM_HORIZON" }, "parentShardId": { "SS": [ "shardId-000000000030" ] }, "leaseKey": { "S": "shardId-000000000062" } }, { "leaseOwner": { "S": "localhost:94bf603f-780b-4121-87a4-bdf501723f83" }, "leaseCounter": { "N": "25443" }, "ownerSwitchesSinceCheckpoint": { "N": "59" }, "checkpointSubSequenceNumber": { "N": "0" }, "checkpoint": { "S": "TRIM_HORIZON" }, "parentShardId": { "SS": [ "shardId-000000000030" ] }, "leaseKey": { "S": "shardId-000000000063" } }, ... ] }
Workaround: I manually edited the DynamoDB table to delete the checkpoints for the parent shards. The child shards were then able to begin processing. I’m not sure whether this resulted in a few items being lost though.