diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 555d8f2..3e83a79 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -223,6 +223,13 @@ public enum EventType { * Master is processing recovery of regions found in ZK RIT */ M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), + /** + * Master controlled events to be executed on the master.
+ * + * M_LOG_REPLAY
+ * Master is processing log replay of failed region server + */ + M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS), /** * RS controlled events to be executed on the RS.
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index dd53657..ccea4ff 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -35,6 +35,7 @@ public enum ExecutorType { MASTER_TABLE_OPERATIONS (4), MASTER_RS_SHUTDOWN (5), MASTER_META_SERVER_OPERATIONS (6), + M_LOG_REPLAY_OPS (7), // RegionServer executor services RS_OPEN_REGION (20), diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java index f5835ac..13493ec 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ChaosMonkey; import org.junit.After; import org.junit.Before; @@ -43,8 +44,18 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT @Before public void setUp() throws Exception { util= getTestingUtil(null); + Configuration conf = util.getConfiguration(); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 25); + if (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, + HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG)) { + // when distributedLogReplay is enabled, we need to make sure rpc timeout & retires are + // smaller enough in order for the replay can complete before ChaosMonkey kills another region + // server + conf.setInt("hbase.regionserver.handler.count", 20); + conf.setInt("hbase.log.replay.retries.number", 2); + conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); + } if(!util.isDistributedCluster()) { - // In MiniCluster mode, we increase number of RS a little bit to speed the test NUM_SLAVES_BASE = 5; } super.setUp(NUM_SLAVES_BASE); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5d4303b..8d12cfb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1096,9 +1096,11 @@ MasterServices, Server { this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 3)); + conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, + conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 4771580..b3a0ae7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -319,21 +319,33 @@ public class MasterFileSystem { private List getLogDirs(final Set serverNames) throws IOException { List logDirs = new ArrayList(); - for (ServerName serverName: serverNames) { - Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString())); - Path splitDir = logDir.suffix(HLog.SPLITTING_EXT); - // Rename the directory so a rogue RS doesn't create more HLogs - if (fs.exists(logDir)) { - if (!this.fs.rename(logDir, splitDir)) { - throw new IOException("Failed fs.rename for log split: " + logDir); + boolean needReleaseLock = false; + if (!this.services.isInitialized()) { + // during master initialization, we could have multiple places splitting a same wal + this.splitLogLock.lock(); + needReleaseLock = true; + } + try { + for (ServerName serverName : serverNames) { + Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString())); + Path splitDir = logDir.suffix(HLog.SPLITTING_EXT); + // Rename the directory so a rogue RS doesn't create more HLogs + if (fs.exists(logDir)) { + if (!this.fs.rename(logDir, splitDir)) { + throw new IOException("Failed fs.rename for log split: " + logDir); + } + logDir = splitDir; + LOG.debug("Renamed region directory: " + splitDir); + } else if (!fs.exists(splitDir)) { + LOG.info("Log dir for server " + serverName + " does not exist"); + continue; } - logDir = splitDir; - LOG.debug("Renamed region directory: " + splitDir); - } else if (!fs.exists(splitDir)) { - LOG.info("Log dir for server " + serverName + " does not exist"); - continue; + logDirs.add(splitDir); + } + } finally { + if (needReleaseLock) { + this.splitLogLock.unlock(); } - logDirs.add(splitDir); } return logDirs; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java new file mode 100644 index 0000000..e51ad6c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java @@ -0,0 +1,88 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master.handler; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.master.DeadServer; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; + +/** + * Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning + * regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay work + * (from {@link MasterFileSystem#splitLog(ServerName)}). During logReplay, if a receiving RS(say A) + * fails again, regions on A won't be able to be assigned to another live RS which causes the log + * replay unable to complete because WAL edits replay depends on receiving RS to be live + */ +@InterfaceAudience.Private +public class LogReplayHandler extends EventHandler { + private static final Log LOG = LogFactory.getLog(LogReplayHandler.class); + private final ServerName serverName; + protected final Server master; + protected final MasterServices services; + protected final DeadServer deadServers; + + public LogReplayHandler(final Server server, final MasterServices services, + final DeadServer deadServers, final ServerName serverName) { + super(server, EventType.M_LOG_REPLAY); + this.master = server; + this.services = services; + this.deadServers = deadServers; + this.serverName = serverName; + this.deadServers.add(serverName); + } + + @Override + public String toString() { + String name = serverName.toString(); + return getClass().getSimpleName() + "-" + name + "-" + getSeqid(); + } + + @Override + public void process() throws IOException { + try { + if (this.master != null && this.master.isStopped()) { + // we're exiting ... + return; + } + this.services.getMasterFileSystem().splitLog(serverName); + } catch (Exception ex) { + if (ex instanceof IOException) { + // resubmit log replay work when failed + this.services.getExecutorService().submit((LogReplayHandler) this); + this.deadServers.add(serverName); + throw new IOException("failed log replay for " + serverName + ", will retry", ex); + } else { + throw new IOException(ex); + } + } finally { + this.deadServers.finish(serverName); + } + // logReplay is the last step of SSH so log a line to indicate that + LOG.info("Finished processing of shutdown of " + serverName); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index b3a2685..d399ef6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -115,6 +115,7 @@ public class ServerShutdownHandler extends EventHandler { @Override public void process() throws IOException { + boolean hasLogReplayWork = false; final ServerName serverName = this.serverName; try { @@ -288,7 +289,10 @@ public class ServerShutdownHandler extends EventHandler { + " didn't complete assignment in time"); } } - this.services.getMasterFileSystem().splitLog(serverName); + // submit logReplay work + this.services.getExecutorService().submit( + new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName)); + hasLogReplayWork = true; } } catch (Exception ex) { if (ex instanceof IOException) { @@ -301,7 +305,9 @@ public class ServerShutdownHandler extends EventHandler { this.deadServers.finish(serverName); } - LOG.info("Finished processing of shutdown of " + serverName); + if (!hasLogReplayWork) { + LOG.info("Finished processing of shutdown of " + serverName); + } } private void resubmit(final ServerName serverName, IOException ex) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f288d88..de0a66b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HealthCheckChore; @@ -1575,8 +1576,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // quite a while inside HConnection layer. The worker won't be available for other // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); - sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); + sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds + sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start(); @@ -3976,11 +3979,21 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa case SUCCESS: break; } + if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + // in replay mode, we only need to catpure the first error because we will retry the whole + // batch when an error happens + break; + } } } catch (IOException ie) { ActionResult result = ResponseConverter.buildActionResult(ie); for (int i = 0; i < mutations.size(); i++) { builder.setResult(i, result); + if (isReplay) { + // in replay mode, we only need to catpure the first error because we will retry the whole + // batch when an error happens + break; + } } } long after = EnvironmentEdgeManager.currentTimeMillis(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index a2676fb..5abca00 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -171,9 +171,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { this.watcher.registerListener(this); // initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); - int res; // wait for master to create the splitLogZnode - res = -1; + int res = -1; while (res == -1 && !exitWorker) { try { res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); @@ -386,12 +385,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { case RESIGNED: if (exitWorker) { LOG.info("task execution interrupted because worker is exiting " + path); - endTask(new SplitLogTask.Resigned(this.serverName), - SplitLogCounters.tot_wkr_task_resigned); - } else { - SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.info("task execution interrupted via zk by manager " + path); } + endTask(new SplitLogTask.Resigned(this.serverName), + SplitLogCounters.tot_wkr_task_resigned); break; } } finally { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index bbac87f..e037584 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -546,7 +546,6 @@ public class HLogSplitter { lastFlushedSequenceId = lastFlushedSequenceIds.get(key); if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { - lastFlushedSequenceId = -1L; RegionStoreSequenceIds ids = SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key); if (ids != null) { @@ -555,11 +554,10 @@ public class HLogSplitter { } else if (sequenceIdChecker != null) { lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); } - if (lastFlushedSequenceId != null && lastFlushedSequenceId >= 0) { - lastFlushedSequenceIds.put(key, lastFlushedSequenceId); - } else { + if (lastFlushedSequenceId == null) { lastFlushedSequenceId = -1L; } + lastFlushedSequenceIds.put(key, lastFlushedSequenceId); } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { editsSkipped++; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 7434be3..37f8dcc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.ServerCallable; import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -60,6 +62,7 @@ import com.google.protobuf.ServiceException; public class WALEditsReplaySink { private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class); + private static final int MAX_BATCH_SIZE = 3000; private final Configuration conf; private final HConnection conn; @@ -67,6 +70,7 @@ public class WALEditsReplaySink { private final MetricsWALEditsReplay metrics; private final AtomicLong totalReplayedEdits = new AtomicLong(); private final boolean skipErrors; + private final int replayTimeout; /** * Create a sink for WAL log entries replay @@ -83,6 +87,8 @@ public class WALEditsReplaySink { this.tableName = tableName; this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS); + // a single replay operation time out and default is 60 seconds + this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000); } /** @@ -121,7 +127,18 @@ public class WALEditsReplaySink { // replaying edits by region for (HRegionInfo curRegion : actionsByRegion.keySet()) { - replayEdits(loc, curRegion, actionsByRegion.get(curRegion)); + List> allActions = actionsByRegion.get(curRegion); + // send edits in chunks + int totalActions = allActions.size(); + int replayedActions = 0; + int curBatchSize = 0; + for (; replayedActions < totalActions;) { + curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE + : (totalActions - replayedActions); + replayEdits(loc, curRegion, allActions.subList(replayedActions, + replayedActions + curBatchSize)); + replayedActions += curBatchSize; + } } long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; @@ -173,7 +190,7 @@ public class WALEditsReplaySink { ReplayServerCallable(final HConnection connection, final byte [] tableName, final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List> actions) { - super(connection, tableName, null); + super(connection, tableName, null, replayTimeout); this.actions = actions; this.regionInfo = regionInfo; this.location = regionLoc;