diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index f34cf2e..99f7ee4 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -76,6 +76,8 @@ public enum EventType { M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing shutdown of a RS M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.). M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing recovery of regions found in ZK RIT + // Master is processing log replay of failed region server + M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS), // RS controlled events to be executed on the RS RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index dd53657..ccea4ff 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -35,6 +35,7 @@ public enum ExecutorType { MASTER_TABLE_OPERATIONS (4), MASTER_RS_SHUTDOWN (5), MASTER_META_SERVER_OPERATIONS (6), + M_LOG_REPLAY_OPS (7), // RegionServer executor services RS_OPEN_REGION (20), diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fe73c44..9a452fd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1072,9 +1072,11 @@ MasterServices, Server { this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 3)); + conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, + conf.getInt("hbase.master.executor.logreplayops.threads", 15)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java new file mode 100644 index 0000000..e51ad6c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java @@ -0,0 +1,88 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master.handler; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.master.DeadServer; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; + +/** + * Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning + * regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay work + * (from {@link MasterFileSystem#splitLog(ServerName)}). During logReplay, if a receiving RS(say A) + * fails again, regions on A won't be able to be assigned to another live RS which causes the log + * replay unable to complete because WAL edits replay depends on receiving RS to be live + */ +@InterfaceAudience.Private +public class LogReplayHandler extends EventHandler { + private static final Log LOG = LogFactory.getLog(LogReplayHandler.class); + private final ServerName serverName; + protected final Server master; + protected final MasterServices services; + protected final DeadServer deadServers; + + public LogReplayHandler(final Server server, final MasterServices services, + final DeadServer deadServers, final ServerName serverName) { + super(server, EventType.M_LOG_REPLAY); + this.master = server; + this.services = services; + this.deadServers = deadServers; + this.serverName = serverName; + this.deadServers.add(serverName); + } + + @Override + public String toString() { + String name = serverName.toString(); + return getClass().getSimpleName() + "-" + name + "-" + getSeqid(); + } + + @Override + public void process() throws IOException { + try { + if (this.master != null && this.master.isStopped()) { + // we're exiting ... + return; + } + this.services.getMasterFileSystem().splitLog(serverName); + } catch (Exception ex) { + if (ex instanceof IOException) { + // resubmit log replay work when failed + this.services.getExecutorService().submit((LogReplayHandler) this); + this.deadServers.add(serverName); + throw new IOException("failed log replay for " + serverName + ", will retry", ex); + } else { + throw new IOException(ex); + } + } finally { + this.deadServers.finish(serverName); + } + // logReplay is the last step of SSH so log a line to indicate that + LOG.info("Finished processing of shutdown of " + serverName); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index b3a2685..d399ef6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -115,6 +115,7 @@ public class ServerShutdownHandler extends EventHandler { @Override public void process() throws IOException { + boolean hasLogReplayWork = false; final ServerName serverName = this.serverName; try { @@ -288,7 +289,10 @@ public class ServerShutdownHandler extends EventHandler { + " didn't complete assignment in time"); } } - this.services.getMasterFileSystem().splitLog(serverName); + // submit logReplay work + this.services.getExecutorService().submit( + new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName)); + hasLogReplayWork = true; } } catch (Exception ex) { if (ex instanceof IOException) { @@ -301,7 +305,9 @@ public class ServerShutdownHandler extends EventHandler { this.deadServers.finish(serverName); } - LOG.info("Finished processing of shutdown of " + serverName); + if (!hasLogReplayWork) { + LOG.info("Finished processing of shutdown of " + serverName); + } } private void resubmit(final ServerName serverName, IOException ex) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f1f09d1..a131415 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1572,6 +1572,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); + sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2); sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start();