Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20630

[Kinesis][DynamoDB] DynamoDB Streams Consumer fails to consume from Latest

    XMLWordPrintableJSON

Details

    Description

      Background
      When consuming from LATEST, the KinesisDataFetcher converts the shard iterator type into an AT_TIMESTAMP to ensure all shards start from the same position. When LATEST is used each shared would effectively start from a different point in the time.

      DynamoDB streams do not support AT_TIMESTAMP iterator type.

      Scope
      Remove shard iterator type transform for DynamoDB streams consumer.

      Reproduction Steps
      Create a simple application that consumer from LATEST using FlinkDynamoDBStreamsConsumer

      Expected Results
      Consumer starts reading records from the head of the stream

      Actual Results
      An exception is thrown:

      Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: 1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' failed to satisfy constraint: Member must satisfy enum value set: [AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; Request ID: AFQ8KCJAP74IN5MR5KD2FP0CTBVV4KQNSO5AEMVJF66Q9ASUAAJG)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515)
      	at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355)
      	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:311)
      	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:302)
      	at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:173)
      	at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:93)
      	at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:85)
      	at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:36)
      	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:469)
      	at org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:107)
      	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:540)
      	at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:348)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
      

      Attachments

        Issue Links

          Activity

            People

              danny.cranmer Danny Cranmer
              danny.cranmer Danny Cranmer
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: