From 69c9f55b87360ea61d413a8f8072aa9b1525a9a7 Mon Sep 17 00:00:00 2001
From: zhangduo
Date: Thu, 17 May 2018 17:46:20 +0800
Subject: [PATCH] HBASE-20424 Allow writing WAL to local and remote cluster
concurrently
---
.../src/main/protobuf/MasterProcedure.proto | 2 +-
.../hadoop/hbase/replication/ReplicationUtils.java | 22 ++-
.../replication/RecoverStandbyProcedure.java | 8 +-
.../master/replication/RemovePeerProcedure.java | 5 +-
.../ReplaySyncReplicationWALManager.java | 93 ++++++------
.../TransitPeerSyncReplicationStateProcedure.java | 4 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 3 +-
.../regionserver/ReplicationSourceService.java | 6 +
.../hadoop/hbase/regionserver/SplitLogWorker.java | 168 ++++++++++++++-------
.../regionserver/wal/CombinedAsyncWriter.java | 80 +++-------
.../hbase/regionserver/wal/DualAsyncFSWAL.java | 3 +-
.../replication/regionserver/Replication.java | 5 +
.../regionserver/ReplicationSourceManager.java | 2 +-
.../SyncReplicationPeerInfoProviderImpl.java | 3 +-
.../hbase/wal/SyncReplicationWALProvider.java | 10 +-
.../client/replication/TestReplicationAdmin.java | 2 +-
.../regionserver/wal/TestCombinedAsyncWriter.java | 20 +--
.../TestSyncReplicationRemoveRemoteWAL.java | 7 +-
.../master/TestRecoverStandbyProcedure.java | 4 +-
.../regionserver/TestReplicationSourceManager.java | 5 +-
.../hbase/wal/TestSyncReplicationWALProvider.java | 1 -
21 files changed, 241 insertions(+), 212 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f15cb04..4c08277 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -467,7 +467,7 @@ enum RecoverStandbyState {
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
INIT_WORKERS = 2;
DISPATCH_TASKS = 3;
- REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
+ SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
}
message RecoverStandbyStateData {
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 069db7a..fceb142 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -46,6 +46,12 @@ public final class ReplicationUtils {
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
+ public static final String SYNC_WAL_SUFFIX = ".syncrep";
+
+ public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
+
+ public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
+
private ReplicationUtils() {
}
@@ -187,14 +193,26 @@ public final class ReplicationUtils {
return new Path(remoteWALDir).getFileSystem(conf);
}
- public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
- public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
+ public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
/**
* Do the sleeping logic
* @param msg Why we sleep
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index e9e3a97..cb561ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure new ReplaySyncReplicationWALProcedure(peerId,
replaySyncReplicationWALManager.removeWALRootPath(wal)))
.toArray(ReplaySyncReplicationWALProcedure[]::new));
- setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
+ setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE;
- case REMOVE_SYNC_REPLICATION_WALS_DIR:
+ case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
+ replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 7335fe0..254448a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
}
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
- ReplaySyncReplicationWALManager remoteWALManager =
- env.getMasterServices().getReplaySyncReplicationWALManager();
- remoteWALManager.removePeerRemoteWALs(peerId);
- remoteWALManager.removePeerReplayWALDir(peerId);
+ env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
index eac5aa4..6d457df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -17,20 +17,20 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.*;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -38,18 +38,16 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
@InterfaceAudience.Private
public class ReplaySyncReplicationWALManager {
private static final Logger LOG =
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
- private static final String REPLAY_SUFFIX = "-replay";
-
private final MasterServices services;
- private final Configuration conf;
-
private final FileSystem fs;
private final Path walRootDir;
@@ -60,69 +58,69 @@ public class ReplaySyncReplicationWALManager {
public ReplaySyncReplicationWALManager(MasterServices services) {
this.services = services;
- this.conf = services.getConfiguration();
this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
}
- public Path getPeerRemoteWALDir(String peerId) {
- return new Path(this.remoteWALDir, peerId);
- }
-
- private Path getPeerReplayWALDir(String peerId) {
- return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
- }
-
public void createPeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+ Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
throw new IOException("Unable to mkdir " + peerRemoteWALDir);
}
}
- public void renamePeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
- Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
- if (fs.exists(peerRemoteWALDir)) {
- if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
- throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
- + peerReplayWALDir + " for peer id=" + peerId);
+ private void rename(Path src, Path dst, String peerId) throws IOException {
+ if (fs.exists(src)) {
+ deleteDir(dst, peerId);
+ if (!fs.rename(src, dst)) {
+ throw new IOException("Failed dir from " + src + " to " + dst + " for peer id=" + peerId);
}
- LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
- peerId);
- } else if (!fs.exists(peerReplayWALDir)) {
- throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
- + peerReplayWALDir + " not exist for peer id=" + peerId);
+ LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+ } else if (!fs.exists(dst)) {
+ throw new IOException(
+ "Want to rename from " + src + " to " + dst + ", but they both do not exist");
}
}
+ public void renameToPeerReplayWALDir(String peerId) throws IOException {
+ rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+ rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
public List getReplayWALs(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
- List replayWals = new ArrayList<>();
- RemoteIterator iterator = fs.listFiles(peerReplayWALDir, false);
- while (iterator.hasNext()) {
- replayWals.add(iterator.next().getPath());
- }
- return replayWals;
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+ return Stream
+ .of(fs.listStatus(peerReplayWALDir,
+ p -> p.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)))
+ .map(fs -> fs.getPath()).collect(Collectors.toList());
}
- public void removePeerReplayWALDir(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+ public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
throw new IOException(
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
}
}
- public void removePeerRemoteWALs(String peerId) throws IOException {
- Path remoteWALDir = getPeerRemoteWALDir(peerId);
- if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
- throw new IOException(
- "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
+ private void deleteDir(Path dir, String peerId) throws IOException {
+ if (!fs.delete(dir, true) && fs.exists(dir)) {
+ throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
}
}
+ public void removePeerRemoteWALs(String peerId) throws IOException {
+ deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+ }
+
public void initPeerWorkers(String peerId) {
BlockingQueue servers = new LinkedBlockingQueue<>();
services.getServerManager().getOnlineServers().keySet()
@@ -144,4 +142,9 @@ public class ReplaySyncReplicationWALManager {
// remove the "/" too.
return pathStr.substring(walRootDir.toString().length() + 1);
}
+
+ @VisibleForTesting
+ public Path getRemoteWALDir() {
+ return remoteWALDir;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index ebe7a93..81ee6b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
if (toState == SyncReplicationState.ACTIVE) {
Path remoteWALDirForPeer =
- ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
+ ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
// check whether the remote wal directory is present
if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
.exists(remoteWALDirForPeer)) {
@@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure
throws ProcedureYieldException, IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ab571c6..4831a5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1958,8 +1958,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
if (this.csm != null) {
// SplitLogWorker needs csm. If none, don't start this.
- this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
- this, walFactory);
+ this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
splitLogWorker.start();
} else {
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 4529943..09ec477 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
+
+ /**
+ * Return the replication peers.
+ */
+ ReplicationPeers getReplicationPeers();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 0046b67..264bf80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -22,22 +22,31 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -66,62 +75,116 @@ public class SplitLogWorker implements Runnable {
Thread worker;
// thread pool which executes recovery work
private SplitLogWorkerCoordination coordination;
- private Configuration conf;
private RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
this.server = server;
- this.conf = conf;
this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
coordination.init(server, conf, splitTaskExecutor, this);
}
- public SplitLogWorker(final Server hserver, final Configuration conf,
- final RegionServerServices server, final LastSequenceId sequenceIdChecker,
- final WALFactory factory) {
- this(hserver, conf, server, new TaskExecutor() {
- @Override
- public Status exec(String filename, CancelableProgressable p) {
- Path walDir;
- FileSystem fs;
- try {
- walDir = FSUtils.getWALRootDir(conf);
- fs = walDir.getFileSystem(conf);
- } catch (IOException e) {
- LOG.warn("could not find root dir or fs", e);
- return Status.RESIGNED;
- }
- // TODO have to correctly figure out when log splitting has been
- // interrupted or has encountered a transient error and when it has
- // encountered a bad non-retry-able persistent error.
- try {
- if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
- fs, conf, p, sequenceIdChecker,
- server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
- return Status.PREEMPTED;
- }
- } catch (InterruptedIOException iioe) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
- return Status.RESIGNED;
- } catch (IOException e) {
- Throwable cause = e.getCause();
- if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
- || cause instanceof ConnectException
- || cause instanceof SocketTimeoutException)) {
- LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
- + "resigning", e);
- return Status.RESIGNED;
- } else if (cause instanceof InterruptedException) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
- return Status.RESIGNED;
- }
- LOG.warn("log splitting of " + filename + " failed, returning error", e);
- return Status.ERR;
- }
- return Status.DONE;
+ public SplitLogWorker(Configuration conf, RegionServerServices server,
+ LastSequenceId sequenceIdChecker, WALFactory factory) {
+ this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
+ }
+
+ // returns whether we need to continue the split work
+ private static boolean processSyncReplicationWAL(String filename, Configuration conf,
+ RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
+ Optional optSyncPeerId =
+ SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename);
+ if (!optSyncPeerId.isPresent()) {
+ return true;
+ }
+ String peerId = optSyncPeerId.get();
+ ReplicationPeerImpl peer =
+ server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
+ if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
+ return true;
+ }
+ Pair stateAndNewState =
+ peer.getSyncReplicationStateAndNewState();
+ if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) {
+ // copy the file to remote and overwrite the previous one
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+ Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
+ FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ try (FSDataInputStream in = fs.open(new Path(walDir, filename));
+ FSDataOutputStream out = remoteFs.create(tmpRemoteWAL, true)) {
+ IOUtils.copy(in, out);
+ }
+ if (!remoteFs.rename(tmpRemoteWAL, new Path(remoteWALDirForPeer, filename))) {
+ throw new IOException("Failed to copy " + filename + " to remote " + remoteWALDir);
+ }
+ } else if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY) ||
+ stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) {
+ // check whether we still need to process this file
+ // actually we only write wal file which name is ended with .syncrep in A state, and after
+ // transiting to a state other than A, we will reopen all the regions so the data in the wal
+ // will be flushed so the wal file will be archived soon. But it is still possible that there
+ // is a server crash when we are transiting from A to S, to simplify the logic of the transit
+ // procedure, here we will also check the remote snapshot directory in state S, so that we do
+ // not need wait until all the wal files with .syncrep suffix to be archived before finishing
+ // the procedure.
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
+ if (fs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
+ // the file has been replayed when the remote cluster was transited from S to DA, the
+ // content will be replicated back to us so give up split it.
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Status splitLog(String filename, CancelableProgressable p, Configuration conf,
+ RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
+ Path walDir;
+ FileSystem fs;
+ try {
+ walDir = FSUtils.getWALRootDir(conf);
+ fs = walDir.getFileSystem(conf);
+ } catch (IOException e) {
+ LOG.warn("could not find root dir or fs", e);
+ return Status.RESIGNED;
+ }
+ try {
+ processSyncReplicationWAL(filename, conf, server, fs, walDir);
+ } catch (IOException e) {
+ LOG.warn("failed to process sync replication wal {}", filename, e);
+ return Status.RESIGNED;
+ }
+ // TODO have to correctly figure out when log splitting has been
+ // interrupted or has encountered a transient error and when it has
+ // encountered a bad non-retry-able persistent error.
+ try {
+ if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
+ p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
+ factory)) {
+ return Status.PREEMPTED;
+ }
+ } catch (InterruptedIOException iioe) {
+ LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
+ return Status.RESIGNED;
+ } catch (IOException e) {
+ Throwable cause = e.getCause();
+ if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException ||
+ cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
+ LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " +
+ "resigning", e);
+ return Status.RESIGNED;
+ } else if (cause instanceof InterruptedException) {
+ LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
+ return Status.RESIGNED;
}
- });
+ LOG.warn("log splitting of " + filename + " failed, returning error", e);
+ return Status.ERR;
+ }
+ return Status.DONE;
}
@Override
@@ -185,6 +248,7 @@ public class SplitLogWorker implements Runnable {
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher
*/
+ @FunctionalInterface
public interface TaskExecutor {
enum Status {
DONE(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
index 8ecfede..ce73fd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
* An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
*/
@InterfaceAudience.Private
-public abstract class CombinedAsyncWriter implements AsyncWriter {
+public class CombinedAsyncWriter implements AsyncWriter {
private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
- protected final ImmutableList writers;
+ private final ImmutableList writers;
- protected CombinedAsyncWriter(ImmutableList writers) {
+ private CombinedAsyncWriter(ImmutableList writers) {
this.writers = writers;
}
@@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter {
}
}
- protected abstract void doSync(CompletableFuture future);
-
- @Override
- public CompletableFuture sync() {
- CompletableFuture future = new CompletableFuture<>();
- doSync(future);
- return future;
- }
-
@Override
public void append(Entry entry) {
writers.forEach(w -> w.append(entry));
}
- public enum Mode {
- SEQUENTIAL, PARALLEL
+ @Override
+ public CompletableFuture sync() {
+ CompletableFuture future = new CompletableFuture<>();
+ AtomicInteger remaining = new AtomicInteger(writers.size());
+ writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (remaining.decrementAndGet() == 0) {
+ future.complete(length);
+ }
+ }));
+ return future;
}
- public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
- ImmutableList ws =
- ImmutableList. builder().add(writer).add(writers).build();
- switch (mode) {
- case SEQUENTIAL:
- return new CombinedAsyncWriter(ws) {
-
- private void doSync(CompletableFuture future, Long length, int index) {
- if (index == writers.size()) {
- future.complete(length);
- return;
- }
- writers.get(index).sync().whenComplete((len, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- doSync(future, len, index + 1);
- });
- }
-
- @Override
- protected void doSync(CompletableFuture future) {
- doSync(future, null, 0);
- }
- };
- case PARALLEL:
- return new CombinedAsyncWriter(ws) {
-
- @Override
- protected void doSync(CompletableFuture future) {
- AtomicInteger remaining = new AtomicInteger(writers.size());
- writers.forEach(w -> w.sync().whenComplete((length, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (remaining.decrementAndGet() == 0) {
- future.complete(length);
- }
- }));
- }
- };
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
+ public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) {
+ return new CombinedAsyncWriter(
+ ImmutableList. builder().add(writer).add(writers).build());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index a98567a..bc10eca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -66,8 +66,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
closeWriter(localWriter);
}
}
- return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
- localWriter);
+ return CombinedAsyncWriter.create(remoteWriter, localWriter);
}
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2199415..b04f0cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider;
}
+
+ @Override
+ public ReplicationPeers getReplicationPeers() {
+ return replicationPeers;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f25b073..827cfa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -652,7 +652,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals)
throws IOException {
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 75274ea..170441b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
return false;
}
Pair states =
- peer.getSyncReplicationStateAndNewState();
+ peer.getSyncReplicationStateAndNewState();
return checker.test(states.getFirst(), states.getSecond());
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8e82d8b..7d542c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -50,7 +50,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -67,9 +66,6 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
- @VisibleForTesting
- public static final String LOG_SUFFIX = ".syncrep";
-
private final WALProvider provider;
private SyncReplicationPeerInfoProvider peerInfoProvider =
@@ -129,9 +125,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
CommonFSUtils.getWALRootDir(conf),
- ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
+ ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
- conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+ conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX, eventLoopGroup, channelClass);
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@@ -304,7 +300,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
*
*/
public static Optional getSyncReplicationPeerIdFromWALName(String name) {
- if (!name.endsWith(LOG_SUFFIX)) {
+ if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c6ffeea..6462234 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -1141,7 +1141,7 @@ public class TestReplicationAdmin {
LOG.info("Expected error:", e);
}
TEST_UTIL.getTestFileSystem()
- .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
+ .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index 07aa6a8..f73b4f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,23 +37,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
+ HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter {
@Rule
public final TestName name = new TestName();
- @Parameter
- public CombinedAsyncWriter.Mode mode;
-
- @Parameters(name = "{index}: mode={0}")
- public static List