From bf0d43814e27cf3c3faa005e2a0bd2d730476376 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 12 May 2018 15:31:28 +0800 Subject: [PATCH] HBASE-20569 NPE in RecoverStandbyProcedure.execute --- .../org/apache/hadoop/hbase/master/HMaster.java | 7 +- .../ReplaySyncReplicationWALManager.java | 85 ++++++++++++--- .../ReplaySyncReplicationWALProcedure.java | 27 ++++- .../hadoop/hbase/regionserver/HRegionServer.java | 7 +- .../hbase/replication/SyncReplicationTestBase.java | 5 +- .../TestSyncReplicationStandbyKillMaster.java | 85 +++++++++++++++ .../TestSyncReplicationStandbyKillRS.java | 114 +++++++++++++++++++++ 7 files changed, 299 insertions(+), 31 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 02c3976..7bc6909 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -831,7 +831,6 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); - this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -897,6 +896,11 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.info(Objects.toString(status)); waitForRegionServers(status); + // Start ReplaySyncReplicationWALManager after region server report to make sure that + // there are available servers to replay WAL + this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); + this.serverManager.registerListener(this.replaySyncReplicationWALManager); + if (this.balancer instanceof FavoredNodesPromoter) { favoredNodesManager = new FavoredNodesManager(this); } @@ -1240,7 +1244,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.assignmentManager != null) { this.assignmentManager.stop(); } - stopProcedureExecutor(); if (this.walManager != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java index eac5aa4..e4bca48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -19,27 +19,30 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @InterfaceAudience.Private -public class ReplaySyncReplicationWALManager { +public class ReplaySyncReplicationWALManager implements ServerListener { private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class); @@ -56,14 +59,29 @@ public class ReplaySyncReplicationWALManager { private final Path remoteWALDir; - private final Map> availServers = new HashMap<>(); + private final ConcurrentMap> workers = + new ConcurrentHashMap<>(); - public ReplaySyncReplicationWALManager(MasterServices services) { + private final ConcurrentMap> workersState = + new ConcurrentHashMap<>(); + + public ReplaySyncReplicationWALManager(MasterServices services) throws IOException { this.services = services; this.conf = services.getConfiguration(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + checkReplayingWALDir(); + } + + private void checkReplayingWALDir() throws IOException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REPLAY_SUFFIX)) { + initPeerWorkers(name.substring(0, name.length() - REPLAY_SUFFIX.length())); + } + } } public Path getPeerRemoteWALDir(String peerId) { @@ -123,25 +141,58 @@ public class ReplaySyncReplicationWALManager { } } + public String removeWALRootPath(Path path) { + String pathStr = path.toString(); + // remove the "/" too. + return pathStr.substring(walRootDir.toString().length() + 1); + } + public void initPeerWorkers(String peerId) { - BlockingQueue servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); - availServers.put(peerId, servers); + synchronized (workers) { + workers.put(peerId, new LinkedBlockingQueue<>()); + workersState.put(peerId, new ConcurrentHashMap<>()); + services.getServerManager().getOnlineServers().keySet() + .forEach(server -> addWorker(peerId, server)); + } } - public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) + public ServerName getWorker(String peerId, long timeout, TimeUnit unit) throws InterruptedException { - return availServers.get(peerId).poll(timeout, unit); + return workers.get(peerId).poll(timeout, unit); } - public void addAvailServer(String peerId, ServerName server) { - availServers.get(peerId).offer(server); + public void addWorker(String peerId, ServerName server) { + workers.get(peerId).offer(server); + workersState.get(peerId).computeIfAbsent(server, s -> new AtomicBoolean(false)).set(false); } - public String removeWALRootPath(Path path) { - String pathStr = path.toString(); - // remove the "/" too. - return pathStr.substring(walRootDir.toString().length() + 1); + public void removeWorker(String peerId, ServerName server) { + // As the worker already was polled from workers. Don't need remove it from workers. + workersState.get(peerId).remove(server); + } + + /** + * Mark a server working. The server will start working when return true. And return false means + * that the server already worked for other task. + * @param peerId id of replication peer + * @param server server name of worker + * @return true or false. + */ + public boolean markWorkerWorking(String peerId, ServerName server) { + return workersState.get(peerId).computeIfAbsent(server, s -> new AtomicBoolean(false)) + .compareAndSet(false, true); + } + + public void markWorkerNotWorking(String peerId, ServerName server) { + workersState.get(peerId).get(server).set(false); + } + + @Override + public void serverAdded(final ServerName server) { + synchronized (workers) { + workers.values().forEach(servers -> servers.offer(server)); + workersState.values().forEach( + peerWorkers -> peerWorkers.computeIfAbsent(server, s -> new AtomicBoolean(false))); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java index 8d8a65a..fced15c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java @@ -101,14 +101,15 @@ public class ReplaySyncReplicationWALProcedure extends Procedure { + try { + Thread.sleep(2000); + UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + } catch (Exception e) { + LOG.error("Failed to stop master", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(1000); + } + verify(UTIL2, 0, 1000); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java new file mode 100644 index 0000000..731148c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class); + + @Test + public void testStandbyKillMaster() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + // Disable async replication and write data, then shutdown + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 1000); + UTIL1.shutdownMiniCluster(); + + JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread(); + Thread t = new Thread(() -> { + try { + List regionServers = + UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rst : regionServers) { + ServerName serverName = rst.getRegionServer().getServerName(); + rst.getRegionServer().stop("Stop RS for test"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + JVMClusterUtil.RegionServerThread restarted = UTIL2.getMiniHBaseCluster().startRegionServer(); + restarted.waitForServerOnline(); + } + } catch (Exception e) { + LOG.error("Failed to kill RS", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(1000); + } + verify(UTIL2, 0, 1000); + } + + private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, + ServerName serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.getDeadServers().isDeadServer(serverName)) { + LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(1); + } + LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.areDeadServersInProgress()) { + LOG.debug("Server [" + serverName + "] still being processed, waiting"); + Thread.sleep(100); + } + LOG.debug("Server [" + serverName + "] done with server shutdown processing"); + } +} -- 2.7.4