Details
-
Bug
-
Status: Reopened
-
Not a Priority
-
Resolution: Unresolved
-
1.8.0, 1.8.1
-
None
-
None
Description
The issue is at two places,
First, KinesisConfigUtil.validateConsumerConfiguration((Properties config)) limit providing both REGION and ENDPOINT in properties.
Second, AWSUtil.createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) is passing REGION as null when ENDPOINT is provided.
The above cases will not work when an AWS DIRECT CONNECT ENDPOINT is used. A sample direct connect endpoint for east region is kinesis-ae1.hdw.r53.feap.pv So this does not follow the convention of kinesis.us-east-1.amazonaws.com where the first word after "-" is region_._
And using the above endpoint will results error __
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidSignatureException; Request ID: ee678308-0c88-ca77-bbc0-54322258672d)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1337)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1312)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:442)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:392)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:282)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:550)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:270)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)