diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 555d8f2..3e83a79 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -223,6 +223,13 @@ public enum EventType {
* Master is processing recovery of regions found in ZK RIT
*/
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS),
+ /**
+ * Master controlled events to be executed on the master.
+ *
+ * M_LOG_REPLAY
+ * Master is processing log replay of failed region server
+ */
+ M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS),
/**
* RS controlled events to be executed on the RS.
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index dd53657..ccea4ff 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -35,6 +35,7 @@ public enum ExecutorType {
MASTER_TABLE_OPERATIONS (4),
MASTER_RS_SHUTDOWN (5),
MASTER_META_SERVER_OPERATIONS (6),
+ M_LOG_REPLAY_OPS (7),
// RegionServer executor services
RS_OPEN_REGION (20),
diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
index f5835ac..13493ec 100644
--- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
+++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ChaosMonkey;
import org.junit.After;
import org.junit.Before;
@@ -43,8 +44,18 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT
@Before
public void setUp() throws Exception {
util= getTestingUtil(null);
+ Configuration conf = util.getConfiguration();
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 25);
+ if (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+ HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG)) {
+ // when distributedLogReplay is enabled, we need to make sure rpc timeout & retires are
+ // smaller enough in order for the replay can complete before ChaosMonkey kills another region
+ // server
+ conf.setInt("hbase.regionserver.handler.count", 20);
+ conf.setInt("hbase.log.replay.retries.number", 2);
+ conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
+ }
if(!util.isDistributedCluster()) {
- // In MiniCluster mode, we increase number of RS a little bit to speed the test
NUM_SLAVES_BASE = 5;
}
super.setUp(NUM_SLAVES_BASE);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5d4303b..8d12cfb 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1096,9 +1096,11 @@ MasterServices, Server {
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
- conf.getInt("hbase.master.executor.serverops.threads", 3));
+ conf.getInt("hbase.master.executor.serverops.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
+ this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
+ conf.getInt("hbase.master.executor.logreplayops.threads", 10));
// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 4771580..b3a0ae7 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -319,21 +319,33 @@ public class MasterFileSystem {
private List getLogDirs(final Set serverNames) throws IOException {
List logDirs = new ArrayList();
- for (ServerName serverName: serverNames) {
- Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
- Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
- // Rename the directory so a rogue RS doesn't create more HLogs
- if (fs.exists(logDir)) {
- if (!this.fs.rename(logDir, splitDir)) {
- throw new IOException("Failed fs.rename for log split: " + logDir);
+ boolean needReleaseLock = false;
+ if (!this.services.isInitialized()) {
+ // during master initialization, we could have multiple places splitting a same wal
+ this.splitLogLock.lock();
+ needReleaseLock = true;
+ }
+ try {
+ for (ServerName serverName : serverNames) {
+ Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
+ Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
+ // Rename the directory so a rogue RS doesn't create more HLogs
+ if (fs.exists(logDir)) {
+ if (!this.fs.rename(logDir, splitDir)) {
+ throw new IOException("Failed fs.rename for log split: " + logDir);
+ }
+ logDir = splitDir;
+ LOG.debug("Renamed region directory: " + splitDir);
+ } else if (!fs.exists(splitDir)) {
+ LOG.info("Log dir for server " + serverName + " does not exist");
+ continue;
}
- logDir = splitDir;
- LOG.debug("Renamed region directory: " + splitDir);
- } else if (!fs.exists(splitDir)) {
- LOG.info("Log dir for server " + serverName + " does not exist");
- continue;
+ logDirs.add(splitDir);
+ }
+ } finally {
+ if (needReleaseLock) {
+ this.splitLogLock.unlock();
}
- logDirs.add(splitDir);
}
return logDirs;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
new file mode 100644
index 0000000..e51ad6c
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.DeadServer;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+/**
+ * Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning
+ * regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay work
+ * (from {@link MasterFileSystem#splitLog(ServerName)}). During logReplay, if a receiving RS(say A)
+ * fails again, regions on A won't be able to be assigned to another live RS which causes the log
+ * replay unable to complete because WAL edits replay depends on receiving RS to be live
+ */
+@InterfaceAudience.Private
+public class LogReplayHandler extends EventHandler {
+ private static final Log LOG = LogFactory.getLog(LogReplayHandler.class);
+ private final ServerName serverName;
+ protected final Server master;
+ protected final MasterServices services;
+ protected final DeadServer deadServers;
+
+ public LogReplayHandler(final Server server, final MasterServices services,
+ final DeadServer deadServers, final ServerName serverName) {
+ super(server, EventType.M_LOG_REPLAY);
+ this.master = server;
+ this.services = services;
+ this.deadServers = deadServers;
+ this.serverName = serverName;
+ this.deadServers.add(serverName);
+ }
+
+ @Override
+ public String toString() {
+ String name = serverName.toString();
+ return getClass().getSimpleName() + "-" + name + "-" + getSeqid();
+ }
+
+ @Override
+ public void process() throws IOException {
+ try {
+ if (this.master != null && this.master.isStopped()) {
+ // we're exiting ...
+ return;
+ }
+ this.services.getMasterFileSystem().splitLog(serverName);
+ } catch (Exception ex) {
+ if (ex instanceof IOException) {
+ // resubmit log replay work when failed
+ this.services.getExecutorService().submit((LogReplayHandler) this);
+ this.deadServers.add(serverName);
+ throw new IOException("failed log replay for " + serverName + ", will retry", ex);
+ } else {
+ throw new IOException(ex);
+ }
+ } finally {
+ this.deadServers.finish(serverName);
+ }
+ // logReplay is the last step of SSH so log a line to indicate that
+ LOG.info("Finished processing of shutdown of " + serverName);
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index b3a2685..d399ef6 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -115,6 +115,7 @@ public class ServerShutdownHandler extends EventHandler {
@Override
public void process() throws IOException {
+ boolean hasLogReplayWork = false;
final ServerName serverName = this.serverName;
try {
@@ -288,7 +289,10 @@ public class ServerShutdownHandler extends EventHandler {
+ " didn't complete assignment in time");
}
}
- this.services.getMasterFileSystem().splitLog(serverName);
+ // submit logReplay work
+ this.services.getExecutorService().submit(
+ new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName));
+ hasLogReplayWork = true;
}
} catch (Exception ex) {
if (ex instanceof IOException) {
@@ -301,7 +305,9 @@ public class ServerShutdownHandler extends EventHandler {
this.deadServers.finish(serverName);
}
- LOG.info("Finished processing of shutdown of " + serverName);
+ if (!hasLogReplayWork) {
+ LOG.info("Finished processing of shutdown of " + serverName);
+ }
}
private void resubmit(final ServerName serverName, IOException ex) throws IOException {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f288d88..de0a66b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
@@ -1575,8 +1576,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// quite a while inside HConnection layer. The worker won't be available for other
// tasks even after current task is preempted after a split task times out.
Configuration sinkConf = HBaseConfiguration.create(conf);
- sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
+ sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
+ sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
splitLogWorker.start();
@@ -3976,11 +3979,21 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
case SUCCESS:
break;
}
+ if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+ // in replay mode, we only need to catpure the first error because we will retry the whole
+ // batch when an error happens
+ break;
+ }
}
} catch (IOException ie) {
ActionResult result = ResponseConverter.buildActionResult(ie);
for (int i = 0; i < mutations.size(); i++) {
builder.setResult(i, result);
+ if (isReplay) {
+ // in replay mode, we only need to catpure the first error because we will retry the whole
+ // batch when an error happens
+ break;
+ }
}
}
long after = EnvironmentEdgeManager.currentTimeMillis();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index a2676fb..5abca00 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -171,9 +171,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
this.watcher.registerListener(this);
// initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
- int res;
// wait for master to create the splitLogZnode
- res = -1;
+ int res = -1;
while (res == -1 && !exitWorker) {
try {
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
@@ -386,12 +385,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
case RESIGNED:
if (exitWorker) {
LOG.info("task execution interrupted because worker is exiting " + path);
- endTask(new SplitLogTask.Resigned(this.serverName),
- SplitLogCounters.tot_wkr_task_resigned);
- } else {
- SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
- LOG.info("task execution interrupted via zk by manager " + path);
}
+ endTask(new SplitLogTask.Resigned(this.serverName),
+ SplitLogCounters.tot_wkr_task_resigned);
break;
}
} finally {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index bbac87f..e037584 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -546,7 +546,6 @@ public class HLogSplitter {
lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) {
- lastFlushedSequenceId = -1L;
RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
if (ids != null) {
@@ -555,11 +554,10 @@ public class HLogSplitter {
} else if (sequenceIdChecker != null) {
lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
}
- if (lastFlushedSequenceId != null && lastFlushedSequenceId >= 0) {
- lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
- } else {
+ if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L;
}
+ lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
editsSkipped++;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 7434be3..37f8dcc 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -60,6 +62,7 @@ import com.google.protobuf.ServiceException;
public class WALEditsReplaySink {
private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
+ private static final int MAX_BATCH_SIZE = 3000;
private final Configuration conf;
private final HConnection conn;
@@ -67,6 +70,7 @@ public class WALEditsReplaySink {
private final MetricsWALEditsReplay metrics;
private final AtomicLong totalReplayedEdits = new AtomicLong();
private final boolean skipErrors;
+ private final int replayTimeout;
/**
* Create a sink for WAL log entries replay
@@ -83,6 +87,8 @@ public class WALEditsReplaySink {
this.tableName = tableName;
this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
+ // a single replay operation time out and default is 60 seconds
+ this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
}
/**
@@ -121,7 +127,18 @@ public class WALEditsReplaySink {
// replaying edits by region
for (HRegionInfo curRegion : actionsByRegion.keySet()) {
- replayEdits(loc, curRegion, actionsByRegion.get(curRegion));
+ List> allActions = actionsByRegion.get(curRegion);
+ // send edits in chunks
+ int totalActions = allActions.size();
+ int replayedActions = 0;
+ int curBatchSize = 0;
+ for (; replayedActions < totalActions;) {
+ curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
+ : (totalActions - replayedActions);
+ replayEdits(loc, curRegion, allActions.subList(replayedActions,
+ replayedActions + curBatchSize));
+ replayedActions += curBatchSize;
+ }
}
long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -173,7 +190,7 @@ public class WALEditsReplaySink {
ReplayServerCallable(final HConnection connection, final byte [] tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List> actions) {
- super(connection, tableName, null);
+ super(connection, tableName, null, replayTimeout);
this.actions = actions;
this.regionInfo = regionInfo;
this.location = regionLoc;