Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-15885

AWS KINESIS component not de-aggregating producer side aggregated msgs

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • camel-aws
    • None
    • Unknown

    Description

      <quarkus-plugin.version>1.8.1.Final</quarkus-plugin.version>
      <dependency>
         <groupId>org.apache.camel.quarkus</groupId>
         <artifactId>camel-quarkus-aws-kinesis</artifactId>
      </dependency>

       
      Our producer sends zipped messages. When reading those about 1/4 of all msgs cannot be unzipped. When digging into the message blob it became clear that one message contains multiple data blobs. That's probably due to the producer side message aggregation ( https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
       
      Instead of a single message the data from the Record entity contains obviously several merged data blobs that I'm unable to read (zip error when trying to decompress). I'd expect to receive several msgs in my processor instead of several msgs merged into one msg blob.

       

       from("aws-kinesis://"  lsc.getStreamName()  "?amazonKinesisClient=client&bridgeErrorHandler=true&maxResultsPerRequest=500&greedy=true&delay=0&runLoggingLevel=TRACE&iteratorType=LATEST")
                       .routeId("vssKinesisStream")
                       .log("LSC message received.")
                       .onException(java.util.zip.ZipException.class).process(exchange -> 
      {                     metrics.countInvalidPerSec();                 }
      )
                       .stop()
                       .end()
                       .process(exchange -> 
      {                         metrics.countReceivedPerSec();                         Record record = exchange.getIn().getBody(Record.class);                         String message = extractMessageFromRecord(record);                         exchange.getIn().setBody(message);                         metrics.countValidPerSec();                         LOG.info("Extracted message: " + message);                         lastReadMsgAge = Duration.between(record.getApproximateArrivalTimestamp().toInstant(), Instant.now());                 }
      

       
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            cmlnecting m
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: