Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7357

Kinesis IO.write throws LimitExceededException

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.11.0
    • 2.14.0
    • io-java-kinesis
    • None

    Description

      I used Kinesis IO to write to kinesis. I get very quickly many exceptions like:

      [shard_map.cc:150] Shard map update for stream "**" failed. Code: LimitExceededException Message: Rate exceeded for stream ** under account ***; retrying in ..

      Also, I see many exceptions like:

      Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191) at org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)

      I'm sure this stream exists because I can see some data from my pipeline that was successfully ingested to it.

       

      Here is my code:

       

       

      .apply(KinesisIO.write()
             .withStreamName("**")
             .withPartitioner(new KinesisPartitioner() {
                             @Override
                              public String getPartitionKey(byte[] value) {
                                              return UUID.randomUUID().toString()
                               }
      
                              @Override
                              public String getExplicitHashKey(byte[] value) {
                                              return null;
                              }
             })
             .withAWSClientsProvider("**","***",Regions.US_EAST_1));

       

      I tried to not use the Kinesis IO. and everything works well, I can't figure out what went wrong.
      I tried using the same API as the library did.

       

      .apply(
       ParDo.of(new DoFn<byte[], Void>() {
       private transient IKinesisProducer inlineProducer;
      
       @Setup
       public void setup(){
      
       KinesisProducerConfiguration config =   KinesisProducerConfiguration.fromProperties(new Properties());
       config.setRegion(Regions.US_EAST_1.getName());
       config.setCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials("***", "***")));
       inlineProducer = new KinesisProducer(config);
       }
      
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
          ByteBuffer data = ByteBuffer.wrap(c.element());
          String partitionKey =UUID.randomUUID().toString();
          ListenableFuture<UserRecordResult> f =
          getProducer().addUserRecord("***", partitionKey, data);
         Futures.addCallback(f, new UserRecordResultFutureCallback());
      }
      
       class UserRecordResultFutureCallback implements FutureCallback<UserRecordResult> {
      
       @Override
       public void onFailure(Throwable cause) {
         throw new RuntimeException("failed produce:"+cause);
       }
      
       @Override
       public void onSuccess(UserRecordResult result) {
      
       }
       }
      
       })
       );
       
      

       

      Any idea what I did wrong? or what the error in the KinesisIO?

      Attachments

        Issue Links

          Activity

            People

              aromanenko Alexey Romanenko
              brachi_packter Brachi Packter
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m