Uploaded image for project: 'Geode'
  1. Geode
  2. GEODE-4659

AbstractGatewaySenderEventProcessor put loop of filter in wrong place

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • wan
    • None

    Description

      When fixing GEODE-3967, I found the loop of filter is in wrong place. 
      
       
      
      If there's no filter defined, the processing  to ignore UPDATE_VERSION_STAMP and events with CME should have nothing to do with filters. But if there's no filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events with CME.
      
       
      
      However, if fixed this problem. the GEODE-3967 have more race conditions to be fixed. (I have fixed several of them). It looks like this bug hided other race conditions from blowing out. 
      
       
      
      GIving the time constrain, I will not fix the filter issue in GEODE_3967 and log this bug for future reference. 
      
       
      Here are the diff to fix or this bug:
      diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
      index 8739a8f72..a3a89fbd0 100644
      --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
      +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
      @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySe
          * @param disp
          * @return true if remote site Gemfire Version is >= 7.0.1
          */
      -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
      -      throws GatewaySenderException {
      -    try {
      -      GatewaySenderEventRemoteDispatcher remoteDispatcher =
      -          (GatewaySenderEventRemoteDispatcher) disp;
      -      // This will create a new connection if no batch has been sent till
      -      // now.
      -      Connection conn = remoteDispatcher.getConnection(false);
      -      if (conn != null) {
      -        short remoteSiteVersion = conn.getWanSiteVersion();
      -        if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
      -          return true;
      -        }
      -      }
      -    } catch (GatewaySenderException e) {
      -      Throwable cause = e.getCause();
      -      if (cause instanceof IOException || e instanceof GatewaySenderConfigurationException
      -          || cause instanceof ConnectionDestroyedException) {
      -        try {
      -          int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
      -          if (logger.isDebugEnabled()) {
      -            logger.debug("Sleeping for {} milliseconds", sleepInterval);
      -          }
      -          Thread.sleep(sleepInterval);
      -        } catch (InterruptedException ie) {
      -          // log the exception
      -          if (logger.isDebugEnabled()) {
      -            logger.debug(ie.getMessage(), ie);
      -          }
      -        }
      -      }
      -      throw e;
      -    }
      -    return false;
      +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
      +    return true;
         }
      }
      diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
      index 69005e02b..da5d1baee 100644
      --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
      +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
      @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
      import org.apache.geode.cache.wan.GatewaySender;
      import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
      import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
      +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
      import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
      import org.apache.geode.internal.logging.LogService;
      
      @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender
           }
         }
      
      +  /**
      +   * Returns if corresponding receiver WAN site of this GatewaySender has GemfireVersion > 7.0.1
      +   *
      +   * @param disp
      +   * @return true if remote site Gemfire Version is >= 7.0.1
      +   */
      +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
      +    return true;
      +  }
      +
      }
      diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
      index 7e67e9bfb..439394382 100644
      --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
      +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
      @@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
                 }
                 // Filter the events
      -          for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
      -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
      -            while (itr.hasNext()) {
      -              GatewayQueueEvent event = itr.next();
      -
      -              // This seems right place to prevent transmission of UPDATE_VERSION events if
      -              // receiver's
      -              // version is < 7.0.1, especially to prevent another loop over events.
      -              if (!sendUpdateVersionEvents
      -                  && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
      -                if (isTraceEnabled) {
      -                  logger.trace(
      -                      "Update Event Version event: {} removed from Gateway Sender queue: {}", event,
      -                      sender);
      -                }
      +          Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
      +          while (itr.hasNext()) {
      +            GatewayQueueEvent event = itr.next();
      +
      +            // This seems right place to prevent transmission of UPDATE_VERSION events if
      +            // receiver's
      +            // version is < 7.0.1, especially to prevent another loop over events.
      +            if (!sendUpdateVersionEvents
      +                && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
      +              if (isDebugEnabled) {
      +                logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}",
      +                    event, sender);
      +              }
      
      -                itr.remove();
      -                statistics.incEventsNotQueued();
      -                continue;
      +              itr.remove();
      +              statistics.incEventsNotQueued();
      +              continue;
      +            }
      +
      +            if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
      +              if (isDebugEnabled) {
      +                logger.debug(
      +                    "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}",
      +                    event, sender);
                     }
      
      +              itr.remove();
      +              statistics.incEventsNotQueued();
      +              continue;
      +            }
      +
      +            for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
                     boolean transmit = filter.beforeTransmit(event);
                     if (!transmit) {
                       if (isDebugEnabled) {
      @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
                       }
                       itr.remove();
                       statistics.incEventsFiltered();
      +                break;
                     }
                   }
                 }
      @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
                 // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
                 if (this.getSender().isParallel()
                     && (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
      -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
      -            while (itr.hasNext()) {
      -              GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next();
      +            Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator();
      +            while (eventItr.hasNext()) {
      +              GatewaySenderEventImpl event = (GatewaySenderEventImpl) eventItr.next();
                     PartitionedRegion qpr = null;
                     if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) {
                       qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue())
      @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
           } // for
         }
      
      -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
      +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
           // onyly in case of remote dispatcher we send versioned events
           return false;
         }

       

      Attachments

        Activity

          People

            zhouxj Xiaojian Zhou
            zhouxj Xiaojian Zhou
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: