Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-6787

Load balanced connection can hold up for whole cluster when one node slows down

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.11.0
    • Core Framework
    • None

    Description

      A slow processor on one node in a cluster can slow down the same processor on the other nodes when load balanced incoming connection is used:

      When a processors decides wether it can pass along a flowfile, it checks whether the outgoing connection is full or not (if full, it yields).
      Similarly when the receiving processor is to be scheduled the framework checks if the incoming connection is empty (if empty, no reason to call onTrigger).

      The problem is that when checking if it's full or not, it checks the total size (which is across the whole cluster) and compares it to the max (which is scoped to the current node).
      The empty check is (correctly) done on the local partition only.

      This can lead to the case where the slow node fills up its queue while the faster ones empty theirs.
      Once the slow node has a full queue, the fast ones stop receiving new input and thus stop working after their queues get emptied.

      The issue is probably the fact that SocketLoadBalancedFlowFileQueue.isFull actually calls to AbstractFlowFileQueue.isFull which checks size(), but that returns the total size. (The empty check looks fine but for reference it is done via SocketLoadBalancedFlowFileQueue.isActiveQueueEmpty).

      AbstractFlowFileQueue.java
      ...
          private MaxQueueSize getMaxQueueSize() {
              return maxQueueSize.get();
          }
      
          @Override
          public boolean isFull() {
              return isFull(size());
          }
      
          protected boolean isFull(final QueueSize queueSize) {
              final MaxQueueSize maxSize = getMaxQueueSize();
      
              // Check if max size is set
              if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
                  return false;
              }
      
              if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
                  return true;
              }
      
              if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
                  return true;
              }
      
              return false;
          }
      ...
      
      SocketLoadBalancedFlowFileQueue.java
      ...
          @Override
          public QueueSize size() {
              return totalSize.get();
          }
      ...
          @Override
          public boolean isActiveQueueEmpty() {
              return localPartition.isActiveQueueEmpty();
          }
      ...
      

      Attachments

        Issue Links

          Activity

            People

              tpalfy Tamas Palfy
              tpalfy Tamas Palfy
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m