Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.11.0
-
None
Description
I used Kinesis IO to write to kinesis. I get very quickly many exceptions like:
[shard_map.cc:150] Shard map update for stream "**" failed. Code: LimitExceededException Message: Rate exceeded for stream ** under account ***; retrying in ..
Also, I see many exceptions like:
Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191) at org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
I'm sure this stream exists because I can see some data from my pipeline that was successfully ingested to it.
Here is my code:
.apply(KinesisIO.write() .withStreamName("**") .withPartitioner(new KinesisPartitioner() { @Override public String getPartitionKey(byte[] value) { return UUID.randomUUID().toString() } @Override public String getExplicitHashKey(byte[] value) { return null; } }) .withAWSClientsProvider("**","***",Regions.US_EAST_1));
I tried to not use the Kinesis IO. and everything works well, I can't figure out what went wrong.
I tried using the same API as the library did.
.apply( ParDo.of(new DoFn<byte[], Void>() { private transient IKinesisProducer inlineProducer; @Setup public void setup(){ KinesisProducerConfiguration config = KinesisProducerConfiguration.fromProperties(new Properties()); config.setRegion(Regions.US_EAST_1.getName()); config.setCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials("***", "***"))); inlineProducer = new KinesisProducer(config); } @ProcessElement public void processElement(ProcessContext c) throws Exception { ByteBuffer data = ByteBuffer.wrap(c.element()); String partitionKey =UUID.randomUUID().toString(); ListenableFuture<UserRecordResult> f = getProducer().addUserRecord("***", partitionKey, data); Futures.addCallback(f, new UserRecordResultFutureCallback()); } class UserRecordResultFutureCallback implements FutureCallback<UserRecordResult> { @Override public void onFailure(Throwable cause) { throw new RuntimeException("failed produce:"+cause); } @Override public void onSuccess(UserRecordResult result) { } } }) );
Any idea what I did wrong? or what the error in the KinesisIO?
Attachments
Issue Links
- is related to
-
BEAM-7589 Kinesis IO.write throws LimitExceededException
- Resolved
- links to