Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29395

[Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

    XMLWordPrintableJSON

Details

    Description

      Background

      The consumer fails when an EFO record publisher uses a timestamp sentinel starting position, the first record batch is not empty, but the first deaggregated record batch is empty. This can happen if the user explicitly specifies the hashkey in the KPL, and does not ensure that the explicitHashKey of every record in the aggregated batch is the same.

      When resharding occurs, the aggregated record batch can have records that are out of the shard's hash key range. This causes the records to be dropped when deaggregating, and can result in this situation, where record batch is not empty, but the deaggregated record batch is empty.

      The symptom seen is similar to the issue seen in https://issues.apache.org/jira/browse/FLINK-20088.

      See here and here for a more detailed explanation

      Replicate

      Get shard information

      aws kinesis describe-stream --stream-name <stream_name>
      {
          "StreamDescription": {
              "Shards": [
                  ...
                  {
                      "ShardId": "shardId-000000000037",
                      "ParentShardId": "shardId-000000000027",
                      "HashKeyRange": {
                          "StartingHashKey": "272225893536750770770699685945414569164",
                          "EndingHashKey": "340282366920938463463374607431768211455"
                      }
                  ...
                  },
                  {
                      "ShardId": "shardId-000000000038",
                      "ParentShardId": "shardId-000000000034",
                      "AdjacentParentShardId": "shardId-000000000036",
                      "HashKeyRange": {
                          "StartingHashKey": "204169420152563078078024764459060926873",
                          "EndingHashKey": "272225893536750770770699685945414569163"
                      }
                  ...
                  }
              ]
      ...
          }
      }

      Create an aggregate record with two records, each with explicit hash keys belonging to different shards

      RecordAggregator aggregator = new RecordAggregator();
      String record1 = "RECORD_1";
      String record2 = "RECORD_2";
      aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", record1.getBytes());
      aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", record2.getBytes());
      
      AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
         .build();
      kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest")); 

      Consume from given stream whilst specifying a Timestamp where the only record retrieved is the record above.

      Error

      java.lang.IllegalArgumentException: Unexpected sentinel type: AT_TIMESTAMP_SEQUENCE_NUM
      	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
      	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
      	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
      
      	at 
      org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
      
      	at 
      org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
      
      	at 
      org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
      
      	at 
      org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
      
      	at 
      org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
      	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829) 

       

      Solution

      This is fixed by reusing the existing timestamp starting position in this condition.

      Attachments

        Issue Links

          Activity

            People

              liangtl Hong Liang Teoh
              liangtl Hong Liang Teoh
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: