From aac9b87559ea36966035bef7250db1c47f17c34b Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Wed, 11 Jul 2018 10:20:35 +0800 Subject: [PATCH] HBASE-20855 PeerConfigTracker only support one listener will cause problem when there is a recovered replication queue --- .../hadoop/hbase/replication/ReplicationPeer.java | 6 ++ .../hbase/replication/ReplicationPeerZKImpl.java | 33 ++++++-- .../hbase/replication/BaseReplicationEndpoint.java | 7 ++ .../replication/HBaseReplicationEndpoint.java | 1 + .../HBaseInterClusterReplicationEndpoint.java | 1 + .../regionserver/ReplicationSource.java | 1 + .../replication/TestReplicationConfigTracker.java | 95 ++++++++++++++++++++++ 7 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 200d81c..a0e758f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -81,4 +81,10 @@ public interface ReplicationPeer { * @param listener Listener for config changes, usually a replication endpoint */ void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + + /** + * Remove a listener when it is closed or terminated + * @param listener Listener for config changes, usually a replication endpoint + */ + void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index b79a982..f37277f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.replication; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -172,11 +175,22 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep @Override public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { if (this.peerConfigTracker != null){ - this.peerConfigTracker.setListener(listener); + this.peerConfigTracker.addListener(listener); } } @Override + public void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.removeListener(listener); + } + } + + public PeerConfigTracker getPeerConfigTracker() { + return this.peerConfigTracker; + } + + @Override public void abort(String why, Throwable e) { LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig + " was aborted for the following reason(s):" + why, e); @@ -275,15 +289,24 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep */ public class PeerConfigTracker extends ZooKeeperNodeTracker { - private ReplicationPeerConfigListener listener; + private Set listeners; public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, Abortable abortable) { super(watcher, peerConfigNode, abortable); + listeners = new HashSet<>(); + } + + public synchronized void addListener(ReplicationPeerConfigListener listener){ + listeners.add(listener); + } + + public Set getListeners(){ + return this.listeners; } - public synchronized void setListener(ReplicationPeerConfigListener listener){ - this.listener = listener; + public synchronized void removeListener(ReplicationPeerConfigListener removeListener) { + listeners.remove(removeListener); } @Override @@ -291,7 +314,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep if (path.equals(node)) { super.nodeCreated(path); ReplicationPeerConfig config = readPeerConfig(); - if (listener != null){ + for (ReplicationPeerConfigListener listener : listeners) { listener.peerConfigUpdated(config); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 71a222a..a3d86b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -111,4 +111,11 @@ public abstract class BaseReplicationEndpoint extends AbstractService return false; } + public void close(){ + if(this.ctx != null) { + ReplicationPeer peer = this.ctx.getReplicationPeer(); + peer.removeListenerOfPeerConfig(this); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 6d3e70e..a9358a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -89,6 +89,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint @Override protected void doStop() { disconnect(); + close(); notifyStopped(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 2ce195c..6c3fc99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -446,6 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi "Aborting to prevent Replication from deadlocking. See HBASE-16081."; abortable.abort(errMsg, new IOException(errMsg)); } + close(); notifyStopped(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d7861a5..a250139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -578,6 +578,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } if (allOtherTaskDone) { + this.source.terminate("recover " + peerClusterZnode + " success"); manager.closeRecoveredQueue(this.source); LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + getStats()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java new file mode 100644 index 0000000..1c1fed9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationConfigTracker.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationConfigTracker extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class); + + @Test + public void testReplicationConfigTracker() throws Exception { + // killing the RS with hbase:meta can result into failed puts until we solve + // IO fencing + int rsToKill1 = utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0; + int otherRs = rsToKill1 == 0 ? 1 : 0; + HRegionServer regionServer = utility1.getHBaseCluster().getRegionServer(otherRs); + + Thread listenerTracker = trackListener(utility1, otherRs); + LOG.info("Start loading table"); + utility1.loadTable(htable1, famName, true); + LOG.info("Done loading table"); + utility1.getHBaseCluster().getRegionServer(rsToKill1).abort("Stopping as part of the test"); + utility1.getHBaseCluster().waitOnRegionServer(rsToKill1); + while (utility1.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) { + LOG.info("Waiting on processing of crashed server before proceeding..."); + Threads.sleep(1000); + } + // wait another 5000ms for replication take over old sources. + Threads.sleep(5000); + Assert.assertTrue(!listenerTracker.isAlive()); + ReplicationPeerZKImpl.PeerConfigTracker tracker = getPeerConfigTracker(regionServer); + Assert.assertEquals("size of listener", 1, tracker.getListeners().size()); + } + + private static Thread trackListener(final HBaseTestingUtility utility, final int rs) { + Thread trackListener = new Thread() { + public void run() { + Replication replication = (Replication) utility.getHBaseCluster().getRegionServer(rs) + .getReplicationSourceService(); + ReplicationSourceManager manager = replication.getReplicationManager(); + ReplicationPeerZKImpl replicationPeerZK = + (ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID); + ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker = + replicationPeerZK.getPeerConfigTracker(); + while (peerConfigTracker.getListeners().size() != 2) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + LOG.error("track config failed", e); + } + } + } + }; + trackListener.setDaemon(true); + trackListener.start(); + return trackListener; + } + + private ReplicationPeerZKImpl.PeerConfigTracker getPeerConfigTracker(HRegionServer rs) { + Replication replication = (Replication) rs.getReplicationSourceService(); + ReplicationSourceManager manager = replication.getReplicationManager(); + ReplicationPeerZKImpl replicationPeerZK = + (ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID); + ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker = + replicationPeerZK.getPeerConfigTracker(); + return peerConfigTracker; + } +} -- 2.7.4