diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 591a551..4ded3fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -154,6 +154,8 @@ @VisibleForTesting protected ZooKeeper zkClient; private ZooKeeper oldZkClient; + @VisibleForTesting + ZooKeeper activeZkClient; /** Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; @@ -830,11 +832,16 @@ public synchronized void deleteStore() throws Exception { * hides the ZK methods of the store from its public interface */ private final class ForwardingWatcher implements Watcher { + private ZooKeeper watchedZkClient; + + public ForwardingWatcher(ZooKeeper client) { + this.watchedZkClient = client; + } @Override public void process(WatchedEvent event) { try { - ZKRMStateStore.this.processWatchEvent(event); + ZKRMStateStore.this.processWatchEvent(watchedZkClient, event); } catch (Throwable t) { LOG.error("Failed to process watcher event " + event + ": " + StringUtils.stringifyException(t)); @@ -845,8 +852,16 @@ public void process(WatchedEvent event) { @VisibleForTesting @Private @Unstable - public synchronized void processWatchEvent(WatchedEvent event) - throws Exception { + public synchronized void processWatchEvent(ZooKeeper zk, + WatchedEvent event) throws Exception { + // only process watcher event from current ZooKeeper Client session. + if (zk != activeZkClient) { + LOG.info("Ignore watcher event type: " + event.getType() + + " with state:" + event.getState() + " for path:" + + event.getPath() + " from old session"); + return; + } + Event.EventType eventType = event.getType(); LOG.info("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() + " for " + this); @@ -1100,7 +1115,8 @@ private synchronized void createConnection() for (int retries = 0; retries < numRetries && zkClient == null; retries++) { try { - zkClient = getNewZooKeeper(); + activeZkClient = getNewZooKeeper(); + zkClient = activeZkClient; for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); } @@ -1130,7 +1146,7 @@ private synchronized void createConnection() protected synchronized ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); - zk.register(new ForwardingWatcher()); + zk.register(new ForwardingWatcher(zk)); return zk; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 8dc3628..b881043 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -90,19 +90,19 @@ public ZooKeeper getNewZooKeeper() } @Override - public synchronized void processWatchEvent(WatchedEvent event) - throws Exception { + public synchronized void processWatchEvent(ZooKeeper zk, + WatchedEvent event) throws Exception { if (forExpire) { // a hack... couldn't find a way to trigger expired event. WatchedEvent expriredEvent = new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null); - super.processWatchEvent(expriredEvent); + super.processWatchEvent(zk, expriredEvent); forExpire = false; syncBarrier.await(); } else { - super.processWatchEvent(event); + super.processWatchEvent(zk, event); } } } @@ -114,7 +114,7 @@ public void process(WatchedEvent event) { super.process(event); try { if (store != null) { - store.processWatchEvent(event); + store.processWatchEvent(store.activeZkClient, event); } } catch (Throwable t) { LOG.error("Failed to process watcher event " + event + ": "