Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-4060

Netty client will wait up to 10 minutes to send messages to unreachable worker on close()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.1.3, 1.2.4, 2.6.2
    • 2.6.3
    • storm-client, storm-core
    • None

    Description

       

      Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means buffered messages will remain in the worker 10 minutes before being given up and reprocessed.
      This leads to increased latencies in the topology.
      It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property to be configured by the user.

      Code that leads to this situation:

      public class Client extends ConnectionWithStatus implements ISaslClient {
          private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; 
      @Override
      public void close() {
          if (!closing) {
              LOG.info("closing Netty Client {}", dstAddressPrefixedName);
              // Set closing to true to prevent any further reconnection attempts.
              closing = true;
              waitForPendingMessagesToBeSent();
              closeChannel();
      
              // stop tracking metrics for this client
              if (this.metricRegistry != null) {
                  this.metricRegistry.deregister(this.metrics);
              }
          }
      } 
      private void waitForPendingMessagesToBeSent() {
          LOG.info("waiting up to {} ms to send {} pending messages to {}",
                   PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
          long totalPendingMsgs = pendingMessages.get();
          long startMs = System.currentTimeMillis();
          while (pendingMessages.get() != 0) {
              try {
                  long deltaMs = System.currentTimeMillis() - startMs;
                  if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                      LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not "
                          + "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                      break;
                  }
                  Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
              } catch (InterruptedException e) {
                  break;
              }
          }
      
      } 

       

      Attachments

        Activity

          People

            rabreu Rui Abreu
            rabreu Rui Abreu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h
                1h