Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-21550

camel-aws-sqs - message is getting expired before extender changes the visibility

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 4.1.0
    • 4.4.5, 4.8.3, 4.10.0
    • camel-aws2-sqs
    • None
    • Unknown

    Description

      Issue: sqs extender scheduler initial start is set to queues visibility timeout.  when the fifo queues in the application are couple of them it works but when its more it causes this issue.

      if (visibilityTimeout != null && visibilityTimeout > 0) {
                      int delay = visibilityTimeout;
                      int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5);
                      this.timeoutExtender = new TimeoutExtender(repeatSeconds);
      
                      if (LOG.isDebugEnabled()) {
                          LOG.debug(
                                  "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)",
                                  delay, delay, repeatSeconds);
                      }
                      this.scheduledFuture
                              = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS);
                  }
      

      E.g if the visibility is 10sec the extender would start around 10sec and by the time the extender changes the visibility the message's visibility in the queue would have expired and got removed. And since the extender uses the expired message receipt handle sqs server return ReceiptHandleIsInvalidException but because this exception is supressed sqs client assumes it was successful.

      try {
                              LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds,
                                      batchEntries);
                              getEndpoint().getClient().changeMessageVisibilityBatch(request);
                              LOG.debug("Extended visibility window for request entries {}", batchEntries);
                          } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                              // Ignore.
                          } catch (SqsException e) {
                              if (e.getMessage()
                                      .contains("Message does not exist or is not available for visibility timeout change")) {
                                  // Ignore.
                              } else {
                                  logException(e, batchEntries);
                              }
                          } catch (Exception e) {
                              logException(e, batchEntries);
                          }
      

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              narsi-nallamilli Narsi Reddy Nallamilli
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: