From b581c29a1b7edf5391ddbeb67d0312ead060215e Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 12 May 2018 15:31:28 +0800 Subject: [PATCH] HBASE-20569 NPE in RecoverStandbyProcedure.execute --- .../org/apache/hadoop/hbase/master/HMaster.java | 11 +++- .../ReplaySyncReplicationWALManager.java | 46 +++++++++++++-- .../ReplaySyncReplicationWALProcedure.java | 17 +++++- .../hbase/replication/SyncReplicationTestBase.java | 5 +- .../TestSyncReplicationStandbyKillMaster.java | 68 ++++++++++++++++++++++ 5 files changed, 136 insertions(+), 11 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4d3310c..67bc04f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -831,7 +831,6 @@ public class HMaster extends HRegionServer implements MasterServices { initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); - this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); // enable table descriptors cache this.tableDescriptors.setCacheOn(); @@ -897,6 +896,10 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.info(Objects.toString(status)); waitForRegionServers(status); + // Start ReplaySyncReplicationWALManager after region server report to make sure that + // there are available servers to replay WAL + this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this); + if (this.balancer instanceof FavoredNodesPromoter) { favoredNodesManager = new FavoredNodesManager(this); } @@ -1241,7 +1244,11 @@ public class HMaster extends HRegionServer implements MasterServices { this.assignmentManager.stop(); } - stopProcedureExecutor(); + try { + stopProcedureExecutor(); + } catch (Throwable t) { + LOG.error("Failed to stop master procedure executor", t); + } if (this.walManager != null) { this.walManager.stop(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java index eac5aa4..5eb194a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -19,14 +19,16 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -56,14 +58,29 @@ public class ReplaySyncReplicationWALManager { private final Path remoteWALDir; - private final Map> availServers = new HashMap<>(); + private final ConcurrentMap> availServers = + new ConcurrentHashMap<>(); - public ReplaySyncReplicationWALManager(MasterServices services) { + private final ConcurrentMap> workers = + new ConcurrentHashMap<>(); + + public ReplaySyncReplicationWALManager(MasterServices services) throws IOException { this.services = services; this.conf = services.getConfiguration(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + checkReplayingWALDir(); + } + + private void checkReplayingWALDir() throws IOException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REPLAY_SUFFIX)) { + initPeerWorkers(name.substring(0, name.length() - REPLAY_SUFFIX.length())); + } + } } public Path getPeerRemoteWALDir(String peerId) { @@ -125,9 +142,13 @@ public class ReplaySyncReplicationWALManager { public void initPeerWorkers(String peerId) { BlockingQueue servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); + ConcurrentMap peerWorkers = new ConcurrentHashMap<>(); + services.getServerManager().getOnlineServers().keySet().forEach(server -> { + servers.offer(server); + peerWorkers.put(server, new AtomicBoolean(false)); + }); availServers.put(peerId, servers); + workers.put(peerId, peerWorkers); } public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) @@ -137,6 +158,19 @@ public class ReplaySyncReplicationWALManager { public void addAvailServer(String peerId, ServerName server) { availServers.get(peerId).offer(server); + workers.get(peerId).get(server).set(false); + } + + /** + * Mark a server working. The server will start working when return true. And return false means + * that the server already worked for other task. + * @param peerId id of replication peer + * @param server server name of worker + * @return true or false. + */ + public boolean markWorkingWorker(String peerId, ServerName server) { + return workers.get(peerId).computeIfAbsent(server, s -> new AtomicBoolean((false))) + .compareAndSet(false, true); } public String removeWALRootPath(Path path) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java index 8d8a65a..a36be6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java @@ -125,9 +125,17 @@ public class ReplaySyncReplicationWALProcedure extends Procedure { + try { + Thread.sleep(2000); + UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test"); + } catch (Exception e) { + LOG.error("Failed to stop master", e); + } + }); + t.start(); + + // Transit standby to DA to replay logs + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (Exception e) { + LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE); + } + + while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) + != SyncReplicationState.DOWNGRADE_ACTIVE) { + Thread.sleep(1000); + } + verify(UTIL2, 0, 1000); + } +} -- 2.7.4