Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.12.0
Description
Background
When consuming from LATEST, the KinesisDataFetcher converts the shard iterator type into an AT_TIMESTAMP to ensure all shards start from the same position. When LATEST is used each shared would effectively start from a different point in the time.
DynamoDB streams do not support AT_TIMESTAMP iterator type.
Scope
Remove shard iterator type transform for DynamoDB streams consumer.
Reproduction Steps
Create a simple application that consumer from LATEST using FlinkDynamoDBStreamsConsumer
Expected Results
Consumer starts reading records from the head of the stream
Actual Results
An exception is thrown:
Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: 1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' failed to satisfy constraint: Member must satisfy enum value set: [AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; Request ID: AFQ8KCJAP74IN5MR5KD2FP0CTBVV4KQNSO5AEMVJF66Q9ASUAAJG) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515) at org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:311) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:302) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:173) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:93) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:85) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:36) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:469) at org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:107) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:540) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:348) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Attachments
Issue Links
- links to