From ba59b73b1812c27f618ae213b7fc442e2583201b Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 12 May 2018 15:31:28 +0800 Subject: [PATCH] HBASE-20569 NPE in RecoverStandbyProcedure.execute --- .../org/apache/hadoop/hbase/master/HMaster.java | 7 +- .../replication/RecoverStandbyProcedure.java | 5 + .../ReplaySyncReplicationWALManager.java | 92 ++++++++++++++--- .../ReplaySyncReplicationWALProcedure.java | 27 ++++- .../hadoop/hbase/regionserver/HRegionServer.java | 7 +- .../regionserver/ReplicationSource.java | 2 +- .../hbase/replication/SyncReplicationTestBase.java | 11 +- .../TestSyncReplicationStandbyKillMaster.java | 85 +++++++++++++++ .../TestSyncReplicationStandbyKillRS.java | 114 +++++++++++++++++++++ 9 files changed, 317 insertions(+), 33 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 02c3976..7bc6909 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -831,7 +831,6 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); - this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -897,6 +896,11 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.info(Objects.toString(status)); waitForRegionServers(status); + // Start ReplaySyncReplicationWALManager after region server report to make sure that + // there are available servers to replay WAL + this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); + this.serverManager.registerListener(this.replaySyncReplicationWALManager); + if (this.balancer instanceof FavoredNodesPromoter) { favoredNodesManager = new FavoredNodesManager(this); } @@ -1240,7 +1244,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.assignmentManager != null) { this.assignmentManager.stop(); } - stopProcedureExecutor(); if (this.walManager != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index e9e3a97..bf4df87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -47,6 +47,10 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure> availServers = new HashMap<>(); + private final ConcurrentMap> workers = + new ConcurrentHashMap<>(); - public ReplaySyncReplicationWALManager(MasterServices services) { + private final ConcurrentMap> workersState = + new ConcurrentHashMap<>(); + + public ReplaySyncReplicationWALManager(MasterServices services) throws IOException { this.services = services; this.conf = services.getConfiguration(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + checkReplayingWALDir(); + } + + private void checkReplayingWALDir() throws IOException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REPLAY_SUFFIX)) { + initPeerWorkers(name.substring(0, name.length() - REPLAY_SUFFIX.length())); + } + } } public Path getPeerRemoteWALDir(String peerId) { @@ -123,25 +141,65 @@ public class ReplaySyncReplicationWALManager { } } + public String removeWALRootPath(Path path) { + String pathStr = path.toString(); + // remove the "/" too. + return pathStr.substring(walRootDir.toString().length() + 1); + } + public void initPeerWorkers(String peerId) { - BlockingQueue servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); - availServers.put(peerId, servers); + synchronized (workers) { + workers.put(peerId, new LinkedBlockingQueue<>()); + workersState.put(peerId, new ConcurrentHashMap<>()); + services.getServerManager().getOnlineServers().keySet() + .forEach(server -> addWorker(peerId, server)); + } + } + + public void removePeerWorkers(String peerId) { + synchronized (workers) { + workers.remove(peerId); + workersState.remove(peerId); + } } - public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) + public ServerName getWorker(String peerId, long timeout, TimeUnit unit) throws InterruptedException { - return availServers.get(peerId).poll(timeout, unit); + return workers.get(peerId).poll(timeout, unit); } - public void addAvailServer(String peerId, ServerName server) { - availServers.get(peerId).offer(server); + public void addWorker(String peerId, ServerName server) { + workers.get(peerId).offer(server); + workersState.get(peerId).computeIfAbsent(server, s -> new AtomicBoolean(false)).set(false); } - public String removeWALRootPath(Path path) { - String pathStr = path.toString(); - // remove the "/" too. - return pathStr.substring(walRootDir.toString().length() + 1); + public void removeWorker(String peerId, ServerName server) { + // As the worker already was polled from workers. Don't need remove it from workers. + workersState.get(peerId).remove(server); + } + + /** + * Mark a server working. The server will start working when return true. And return false means + * that the server already worked for other task. + * @param peerId id of replication peer + * @param server server name of worker + * @return true or false. + */ + public boolean markWorkerWorking(String peerId, ServerName server) { + return workersState.get(peerId).computeIfAbsent(server, s -> new AtomicBoolean(false)) + .compareAndSet(false, true); + } + + public void markWorkerNotWorking(String peerId, ServerName server) { + workersState.get(peerId).get(server).set(false); + } + + @Override + public void serverAdded(final ServerName server) { + synchronized (workers) { + workers.values().forEach(servers -> servers.offer(server)); + workersState.values().forEach( + peerWorkers -> peerWorkers.computeIfAbsent(server, s -> new AtomicBoolean(false))); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java index 8d8a65a..fced15c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java @@ -101,14 +101,15 @@ public class ReplaySyncReplicationWALProcedure extends Procedure regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size()); + HRegion region = regions.get(0); util.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java new file mode 100644 index 0000000..92f2df7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class); + + @Test + public void testStandbyKillMaster() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 1000); + UTIL1.shutdownMiniCluster(); + + Thread t = new Thread(() -> { + try { + Thread.sleep(2000); + UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + } catch (Exception e) { + LOG.error("Failed to stop master", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(1000); + } + verify(UTIL2, 0, 1000); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java new file mode 100644 index 0000000..7bb3d16 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class); + + @Test + public void testStandbyKillRegionServer() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 1000); + UTIL1.shutdownMiniCluster(); + + JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread(); + Thread t = new Thread(() -> { + try { + List regionServers = + UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : regionServers) { + ServerName serverName = rst.getRegionServer().getServerName(); + rst.getRegionServer().stop("Stop RS for test"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + JVMClusterUtil.RegionServerThread restarted = UTIL2.getMiniHBaseCluster().startRegionServer(); + restarted.waitForServerOnline(); + } + } catch (Exception e) { + LOG.error("Failed to kill RS", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(1000); + } + verify(UTIL2, 0, 1000); + } + + private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, + ServerName serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.getDeadServers().isDeadServer(serverName)) { + LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(1); + } + LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.areDeadServersInProgress()) { + LOG.debug("Server [" + serverName + "] still being processed, waiting"); + Thread.sleep(100); + } + LOG.debug("Server [" + serverName + "] done with server shutdown processing"); + } +} -- 2.7.4