From 8cb9913fd7bd10802ff1537d7b1124d083cc4ae8 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 12 May 2018 15:31:28 +0800 Subject: [PATCH] HBASE-20569 NPE in RecoverStandbyProcedure.execute --- .../ReplaySyncReplicationWALManager.java | 46 ++++++++++++++++++++-- .../ReplaySyncReplicationWALProcedure.java | 13 +++++- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java index eac5aa4..d7186f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java @@ -17,16 +17,23 @@ */ package org.apache.hadoop.hbase.master.replication; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -56,14 +63,29 @@ public class ReplaySyncReplicationWALManager { private final Path remoteWALDir; - private final Map> availServers = new HashMap<>(); + private final ConcurrentMap> availServers = + new ConcurrentHashMap<>(); - public ReplaySyncReplicationWALManager(MasterServices services) { + private final ConcurrentMap> workers = + new ConcurrentHashMap<>(); + + public ReplaySyncReplicationWALManager(MasterServices services) throws IOException { this.services = services; this.conf = services.getConfiguration(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); + checkReplayingWALDir(); + } + + private void checkReplayingWALDir() throws IOException { + FileStatus[] files = fs.listStatus(remoteWALDir); + for (FileStatus file : files) { + String name = file.getPath().getName(); + if (name.endsWith(REPLAY_SUFFIX)) { + initPeerWorkers(name.substring(0, name.length() - REPLAY_SUFFIX.length())); + } + } } public Path getPeerRemoteWALDir(String peerId) { @@ -125,9 +147,13 @@ public class ReplaySyncReplicationWALManager { public void initPeerWorkers(String peerId) { BlockingQueue servers = new LinkedBlockingQueue<>(); - services.getServerManager().getOnlineServers().keySet() - .forEach(server -> servers.offer(server)); + ConcurrentMap peerWorkers = new ConcurrentHashMap<>(); + services.getServerManager().getOnlineServers().keySet().forEach(server -> { + servers.offer(server); + peerWorkers.put(server, new AtomicBoolean(false)); + }); availServers.put(peerId, servers); + workers.put(peerId, peerWorkers); } public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit) @@ -137,6 +163,18 @@ public class ReplaySyncReplicationWALManager { public void addAvailServer(String peerId, ServerName server) { availServers.get(peerId).offer(server); + workers.get(peerId).get(server).set(false); + } + + /** + * Mark a server working. The server will start working when return true. And return false means + * that the server already worked for other task. + * @param peerId id of replication peer + * @param server server name of worker + * @return true or false. + */ + public boolean markWorkingWorker(String peerId, ServerName server) { + return workers.get(peerId).get(server).compareAndSet(false, true); } public String removeWALRootPath(Path path) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java index 8d8a65a..66b8e33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java @@ -125,9 +125,12 @@ public class ReplaySyncReplicationWALProcedure extends Procedure