position with pb magic prefix
+ * prepended suitable for use as content of an hlog position in a
+ * replication queue.
+ */
+ public static byte[] toByteArray(
+ final long position) {
+ byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
+ .build().toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
+ * @param lockOwner
+ * @return Serialized protobuf of lockOwner with pb magic prefix
+ * prepended suitable for use as content of an replication lock during
+ * region server fail over.
+ */
+ static byte[] lockToByteArray(
+ final String lockOwner) {
+ byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ /**
* @param bytes Content of a peer znode.
* @return ClusterKey parsed from the passed bytes.
* @throws DeserializationException
@@ -476,6 +502,58 @@ public class ReplicationZookeeper implements Closeable {
}
}
+ /**
+ * @param bytes - Content of a HLog position znode.
+ * @return long - The current HLog position.
+ * @throws DeserializationException
+ */
+ public static long parseHLogPositionFrom(
+ final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
+ .newBuilder();
+ ZooKeeperProtos.ReplicationHLogPosition position;
+ try {
+ position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return position.getPosition();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toLong(bytes);
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * @param bytes - Content of a lock znode.
+ * @return String - The owner of the lock.
+ * @throws DeserializationException
+ */
+ static String parseLockOwnerFrom(
+ final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
+ .newBuilder();
+ ZooKeeperProtos.ReplicationLock lock;
+ try {
+ lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return lock.getLockOwner();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toString(bytes);
+ }
+ return "";
+ }
+ }
+
private boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index f6c1f14..5b85dbd 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -105,6 +105,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String balancerZNode;
// znode containing the lock for the tables
public String tableLockZNode;
+ // znode containing the state of recovering regions
+ public String recoveringRegionsZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList.META. are assigned. If not,
- * assign them.
+ * Check .META. is assigned. If not, assign it.
+ * @param status MonitoredTask
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
- * @return True if meta is healthy, assigned
*/
- boolean assignMeta(MonitoredTask status)
- throws InterruptedException, IOException, KeeperException {
+ private void assignMeta(MonitoredTask status)
+ throws InterruptedException, IOException, KeeperException {
+ // Work on meta region
int assigned = 0;
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
+ boolean beingExpired = false;
- // Work on .META. region. Is it in zk in transition?
status.setStatus("Assigning META region");
- assignmentManager.getRegionStates().createRegionState(
- HRegionInfo.FIRST_META_REGIONINFO);
- boolean rit = this.assignmentManager.
- processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
- ServerName currentMetaServer = null;
- boolean metaRegionLocation = catalogTracker.verifyMetaRegionLocation(timeout);
+
+ assignmentManager.getRegionStates().createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
+ boolean rit = this.assignmentManager
+ .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
+ boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
if (!rit && !metaRegionLocation) {
- currentMetaServer = this.catalogTracker.getMetaLocation();
- splitLogAndExpireIfOnline(currentMetaServer);
- this.assignmentManager.assignMeta();
- enableSSHandWaitForMeta();
+ ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
+ if (currentMetaServer != null) {
+ beingExpired = expireIfOnline(currentMetaServer);
+ }
+ if (beingExpired) {
+ if (this.distributedLogReplay) {
+ Setsn is being expired by the function.
* @throws IOException
*/
- private void splitLogAndExpireIfOnline(final ServerName sn)
+ private boolean expireIfOnline(final ServerName sn)
throws IOException {
if (sn == null || !serverManager.isServerOnline(sn)) {
- return;
+ return false;
}
- LOG.info("Forcing splitLog and expire of " + sn);
- fileSystemManager.splitMetaLog(sn);
- fileSystemManager.splitLog(sn);
+ LOG.info("Forcing expire of " + sn);
serverManager.expireServer(sn);
+ return true;
}
@Override
@@ -2211,6 +2235,14 @@ Server {
return this.serverShutdownHandlerEnabled;
}
+ /**
+ * Report whether this master has started initialization and is about to do meta region assignment
+ * @return true if master is in initialization & about to assign META regions
+ */
+ public boolean isInitializationStartsMetaRegoinAssignment() {
+ return this.initializationBeforeMetaAssignment;
+ }
+
@Override
public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
throws ServiceException {
@@ -2654,4 +2686,5 @@ Server {
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
}
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 3c58a35..4451183 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,26 +36,29 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.zookeeper.KeeperException;
/**
* This class abstracts a bunch of operations the HMaster needs to interact with
@@ -83,6 +88,7 @@ public class MasterFileSystem {
private final Path tempdir;
// create the split log lock
final Lock splitLogLock = new ReentrantLock();
+ final boolean distributedLogReplay;
final boolean distributedLogSplitting;
final SplitLogManager splitLogManager;
private final MasterServices services;
@@ -118,15 +124,13 @@ public class MasterFileSystem {
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
- this.distributedLogSplitting =
- conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
+ this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
+ master, services, master.getServerName());
+ this.distributedLogSplitting = conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
if (this.distributedLogSplitting) {
- this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
- master.getConfiguration(), master, services, master.getServerName());
this.splitLogManager.finishInitialization(masterRecovery);
- } else {
- this.splitLogManager = null;
}
+ this.distributedLogReplay = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
@@ -212,21 +216,23 @@ public class MasterFileSystem {
}
/**
- * Inspect the log directory to recover any log file without
- * an active region server.
+ * Inspect the log directory to find dead servers which need recovery work
+ * @return A set of ServerNames which aren't running but still have WAL files left in file system
*/
- void splitLogAfterStartup() {
+ Set getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
- HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+ HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+
+ Set serverNames = new HashSet();
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
+
do {
if (master.isStopped()) {
- LOG.warn("Master stopped while splitting logs");
+ LOG.warn("Master stopped while trying to get failed servers.");
break;
}
- List serverNames = new ArrayList();
try {
- if (!this.fs.exists(logsDirPath)) return;
+ if (!this.fs.exists(logsDirPath)) return serverNames;
FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
// Get online servers after getting log folders to avoid log folder deletion of newly
// checked in region servers . see HBASE-5916
@@ -235,7 +241,7 @@ public class MasterFileSystem {
if (logFolders == null || logFolders.length == 0) {
LOG.debug("No log files to split, proceeding...");
- return;
+ return serverNames;
}
for (FileStatus status : logFolders) {
String sn = status.getPath().getName();
@@ -249,23 +255,19 @@ public class MasterFileSystem {
+ "to a known region server, splitting");
serverNames.add(serverName);
} else {
- LOG.info("Log folder " + status.getPath()
- + " belongs to an existing region server");
+ LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
}
}
- splitLog(serverNames, META_FILTER);
- splitLog(serverNames, NON_META_FILTER);
retrySplitting = false;
} catch (IOException ioe) {
- LOG.warn("Failed splitting of " + serverNames, ioe);
+ LOG.warn("Failed getting failed servers to be recovered.", ioe);
if (!checkFileSystem()) {
LOG.warn("Bad Filesystem, exiting");
Runtime.getRuntime().halt(1);
}
try {
if (retrySplitting) {
- Thread.sleep(conf.getInt(
- "hbase.hlog.split.failure.retry.interval", 30 * 1000));
+ Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
}
} catch (InterruptedException e) {
LOG.warn("Interrupted, aborting since cannot return w/o splitting");
@@ -275,10 +277,12 @@ public class MasterFileSystem {
}
}
} while (retrySplitting);
+
+ return serverNames;
}
public void splitLog(final ServerName serverName) throws IOException {
- List serverNames = new ArrayList();
+ Set serverNames = new HashSet();
serverNames.add(serverName);
splitLog(serverNames);
}
@@ -290,7 +294,7 @@ public class MasterFileSystem {
*/
public void splitMetaLog(final ServerName serverName) throws IOException {
long splitTime = 0, splitLogSize = 0;
- List serverNames = new ArrayList();
+ Set serverNames = new HashSet();
serverNames.add(serverName);
List logDirs = getLogDirs(serverNames);
if (logDirs.isEmpty()) {
@@ -299,14 +303,14 @@ public class MasterFileSystem {
}
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
- splitLogSize = splitLogManager.splitLogDistributed(logDirs, META_FILTER);
+ splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, META_FILTER);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
if (this.metricsMaster != null) {
this.metricsMaster.addSplit(splitTime, splitLogSize);
}
}
- private List getLogDirs(final List serverNames) throws IOException {
+ private List getLogDirs(final Set serverNames) throws IOException {
List logDirs = new ArrayList();
for (ServerName serverName: serverNames) {
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
@@ -327,7 +331,55 @@ public class MasterFileSystem {
return logDirs;
}
- public void splitLog(final List serverNames) throws IOException {
+ /**
+ * Mark regions in recovering state when distributedLogReplay are set true
+ * @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained
+ * in them
+ * @throws IOException
+ */
+ public void prepareDistributedLogReplay(Set serverNames) throws IOException {
+ if (!this.distributedLogReplay) {
+ return;
+ }
+ this.splitLogManager.prepareDistributedLogReplay(serverNames);
+ // mark regions in recovering state
+ for (ServerName serverName : serverNames) {
+ NavigableMap regions = this.getServerUserRegions(serverName);
+ if (regions == null) {
+ continue;
+ }
+ try {
+ this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet());
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Mark meta regions in recovering state when distributedLogReplay are set true. The function is used
+ * when {@link #getServerUserRegions(ServerName)} can't be used in case meta RS is down.
+ * @param serverName
+ * @param regions
+ * @throws IOException
+ */
+ public void prepareMetaLogReplay(ServerName serverName, Set regions)
+ throws IOException {
+ if (!this.distributedLogReplay || (regions == null)) {
+ return;
+ }
+ Set tmpServerNames = new HashSet();
+ tmpServerNames.add(serverName);
+ this.splitLogManager.prepareDistributedLogReplay(tmpServerNames);
+ // mark regions in recovering state
+ try {
+ this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void splitLog(final Set serverNames) throws IOException {
splitLog(serverNames, NON_META_FILTER);
}
@@ -338,7 +390,7 @@ public class MasterFileSystem {
* @param filter
* @throws IOException
*/
- public void splitLog(final List serverNames, PathFilter filter) throws IOException {
+ public void splitLog(final Set serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List logDirs = getLogDirs(serverNames);
@@ -350,7 +402,7 @@ public class MasterFileSystem {
if (distributedLogSplitting) {
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
- splitLogSize = splitLogManager.splitLogDistributed(logDirs,filter);
+ splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
} else {
for(Path logDir: logDirs){
@@ -648,4 +700,18 @@ public class MasterFileSystem {
this.services.getTableDescriptors().add(htd);
return htd;
}
+
+ private NavigableMap getServerUserRegions(ServerName serverName)
+ throws IOException {
+ if (!this.master.isStopped()) {
+ try {
+ this.master.getCatalogTracker().waitForMeta();
+ return MetaReader.getServerUserRegions(this.master.getCatalogTracker(), serverName);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted", e);
+ }
+ }
+ return null;
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 2f5d02b..3563d6c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -27,9 +27,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -37,19 +37,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
-import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
+import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
+import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
+import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -154,21 +154,21 @@ public class ServerManager {
private Set queuedDeadServers = new HashSet();
/**
- * Set of region servers which are dead and submitted to ServerShutdownHandler to
- * process but not fully processed immediately.
+ * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
+ * fully processed immediately.
*
- * If one server died before assignment manager finished the failover cleanup, the server
- * will be added to this set and will be processed through calling
+ * If one server died before assignment manager finished the failover cleanup, the server will be
+ * added to this set and will be processed through calling
* {@link ServerManager#processQueuedDeadServers()} by assignment manager.
*
- * For all the region servers in this set, HLog split is already completed.
+ * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
*
- * ServerShutdownHandler processes a dead server submitted to the handler after
- * the handler is enabled. It may not be able to complete the processing because meta
- * is not yet online or master is currently in startup mode. In this case, the dead
- * server will be parked in this set temporarily.
+ * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
+ * enabled. It may not be able to complete the processing because meta is not yet online or master
+ * is currently in startup mode. In this case, the dead server will be parked in this set
+ * temporarily.
*/
- private Set requeuedDeadServers = new HashSet();
+ private Map requeuedDeadServers = new HashMap();
/**
* Constructor.
@@ -497,6 +497,10 @@ public class ServerManager {
}
public synchronized void processDeadServer(final ServerName serverName) {
+ this.processDeadServer(serverName, false);
+ }
+
+ public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
// When assignment manager is cleaning up the zookeeper nodes and rebuilding the
// in-memory region states, region servers could be down. Meta table can and
// should be re-assigned, log splitting can be done too. However, it is better to
@@ -506,13 +510,14 @@ public class ServerManager {
// the handler threads and meta table could not be re-assigned in case
// the corresponding server is down. So we queue them up here instead.
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
- requeuedDeadServers.add(serverName);
+ requeuedDeadServers.put(serverName, shouldSplitHlog);
return;
}
this.deadservers.add(serverName);
- this.services.getExecutorService().submit(new ServerShutdownHandler(
- this.master, this.services, this.deadservers, serverName, false));
+ this.services.getExecutorService().submit(
+ new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
+ shouldSplitHlog));
}
/**
@@ -525,18 +530,20 @@ public class ServerManager {
}
Iterator serverIterator = queuedDeadServers.iterator();
while (serverIterator.hasNext()) {
- expireServer(serverIterator.next());
+ ServerName tmpServerName = serverIterator.next();
+ expireServer(tmpServerName);
serverIterator.remove();
+ requeuedDeadServers.remove(tmpServerName);
}
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
LOG.info("AssignmentManager hasn't finished failover cleanup");
}
- serverIterator = requeuedDeadServers.iterator();
- while (serverIterator.hasNext()) {
- processDeadServer(serverIterator.next());
- serverIterator.remove();
+
+ for(ServerName tmpServerName : requeuedDeadServers.keySet()){
+ processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
}
+ requeuedDeadServers.clear();
}
/*
@@ -822,6 +829,14 @@ public class ServerManager {
return new HashSet(this.queuedDeadServers);
}
+ /**
+ * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
+ * splitlog need flag.
+ */
+ Map getRequeuedDeadServers() {
+ return Collections.unmodifiableMap(this.requeuedDeadServers);
+ }
+
public boolean isServerOnline(ServerName serverName) {
return serverName != null && onlineServers.containsKey(serverName);
}
@@ -835,7 +850,7 @@ public class ServerManager {
public synchronized boolean isServerDead(ServerName serverName) {
return serverName == null || deadservers.isDeadServer(serverName)
|| queuedDeadServers.contains(serverName)
- || requeuedDeadServers.contains(serverName);
+ || requeuedDeadServers.containsKey(serverName);
}
public void shutdownCluster() {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index aa0b507..f5bee8f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,16 +44,20 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
@@ -119,6 +124,17 @@ public class SplitLogManager extends ZooKeeperListener {
private long lastNodeCreateTime = Long.MAX_VALUE;
public boolean ignoreZKDeleteForTesting = false;
+ /**
+ * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
+ * operatoin. So the lock is used to guard such cases.
+ */
+ protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
+
+ private final Set inflightWorkItems = Collections
+ .synchronizedSet(new HashSet());
+
+ final boolean distributedLogReplay;
+
private final ConcurrentMap tasks = new ConcurrentHashMap();
private TimeoutMonitor timeoutMonitor;
@@ -186,6 +202,7 @@ public class SplitLogManager extends ZooKeeperListener {
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
this.failedDeletions = Collections.synchronizedSet(new HashSet());
+ this.distributedLogReplay = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
}
public void finishInitialization(boolean masterRecovery) {
@@ -245,7 +262,22 @@ public class SplitLogManager extends ZooKeeperListener {
* @return cumulative size of the logfiles split
*/
public long splitLogDistributed(final List logDirs) throws IOException {
- return splitLogDistributed(logDirs, null);
+ if (logDirs.isEmpty()) {
+ return 0;
+ }
+ Set serverNames = new HashSet();
+ for (Path logDir : logDirs) {
+ try {
+ ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
+ if (serverName != null) {
+ serverNames.add(serverName);
+ }
+ } catch (IllegalArgumentException e) {
+ // ignore invalid format error.
+ LOG.warn("Cannot parse server name from " + logDir);
+ }
+ }
+ return splitLogDistributed(serverNames, logDirs, null);
}
/**
@@ -259,8 +291,8 @@ public class SplitLogManager extends ZooKeeperListener {
* @throws IOException If there was an error while splitting any log file
* @return cumulative size of the logfiles split
*/
- public long splitLogDistributed(final List logDirs, PathFilter filter)
- throws IOException {
+ public long splitLogDistributed(final Set serverNames, final List logDirs,
+ PathFilter filter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDirs);
FileStatus[] logfiles = getFileList(logDirs, filter);
@@ -283,7 +315,11 @@ public class SplitLogManager extends ZooKeeperListener {
throw new IOException("duplicate log split scheduled for " + lf.getPath());
}
}
+ this.clearLogReplayInFlightWorkItems(serverNames);
waitForSplittingCompletion(batch, status);
+ // mark recovering regions up
+ this.removeRecoveringRegionsFromZK(serverNames);
+
if (batch.done != batch.installed) {
batch.isDead = true;
SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
@@ -410,6 +446,89 @@ public class SplitLogManager extends ZooKeeperListener {
return count;
}
+ /**
+ * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
+ * region server hosting the region can allow reads to the recovered region
+ * @param serverNames servers which are just recovered
+ */
+ private void removeRecoveringRegionsFromZK(final Set serverNames) {
+
+ if (!this.distributedLogReplay) {
+ // the function is only used in WALEdit direct replay mode
+ return;
+ }
+
+ int count = 0;
+ Set recoveredServerNameSet = new HashSet();
+ if (serverNames != null) {
+ for (ServerName tmpServerName : serverNames) {
+ recoveredServerNameSet.add(tmpServerName.getServerName());
+ }
+ }
+
+ try {
+ this.recoveringRegionLock.lock();
+
+ List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null) {
+ for (String t : tasks) {
+ if (!ZKSplitLog.isRescanNode(watcher, t)) {
+ count++;
+ }
+ }
+ }
+ if (count == 0 && this.inflightWorkItems.isEmpty()) {
+ // no splitting work items left
+ deleteRecoveringRegionZNodes(null);
+ } else if (!recoveredServerNameSet.isEmpty()) {
+ // remove recovering regions which doesn't have any RS associated with it
+ List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+ if (regions != null) {
+ for (String region : regions) {
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+ List failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
+ if (failedServers == null || failedServers.isEmpty()) {
+ ZKUtil.deleteNode(watcher, nodePath);
+ LOG.debug("znode " + nodePath + " is deleted.");
+ return;
+ }
+ if (recoveredServerNameSet.containsAll(failedServers)) {
+ ZKUtil.deleteNodeRecursively(watcher, nodePath);
+ LOG.debug("znode " + nodePath + " with its children are deleted.");
+ } else {
+ for (String failedServer : failedServers) {
+ if (recoveredServerNameSet.contains(failedServer)) {
+ String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
+ ZKUtil.deleteNode(watcher, tmpPath);
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (KeeperException ke) {
+ LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception", ke);
+ } finally {
+ this.recoveringRegionLock.unlock();
+ }
+ }
+
+ private void deleteRecoveringRegionZNodes(List regions) {
+ try {
+ if (regions == null) {
+ // remove all children under /home/recovering-regions
+ ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
+ } else {
+ for (String curRegion : regions) {
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
+ ZKUtil.deleteNodeRecursively(watcher, nodePath);
+ }
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
+ }
+ }
+
private void setDone(String path, TerminationStatus status) {
Task task = tasks.get(path);
if (task == null) {
@@ -860,9 +979,113 @@ public class SplitLogManager extends ZooKeeperListener {
}
/**
- * Keeps track of the batch of tasks submitted together by a caller in
- * splitLogDistributed(). Clients threads use this object to wait for all
- * their tasks to be done.
+ * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
+ * all regions of the passed in region servers
+ * @param serverName the name of a region server
+ * @param userRegions user regiones assigned on the region server
+ */
+ void markRegionsRecoveringInZK(final ServerName serverName, Set userRegions)
+ throws KeeperException {
+ if (userRegions == null || !this.distributedLogReplay) {
+ return;
+ }
+
+ for (HRegionInfo region : userRegions) {
+ String regionEncodeName = region.getEncodedName();
+ long retries = this.zkretries;
+
+ do {
+ String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
+ long lastRecordedFlushedSequenceId = -1;
+ try {
+ long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
+ regionEncodeName.getBytes());
+
+ /*
+ * znode layout:
+ * .../region_id[last known flushed sequence id]/failed server[last known
+ * flushed sequence id for the server]
+ */
+ byte[] data = ZKUtil.getData(this.watcher, nodePath);
+ if(data == null) {
+ ZKUtil.createSetData(this.watcher, nodePath,
+ ReplicationZookeeper.toByteArray(lastSequenceId));
+ } else {
+ lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+ if (lastRecordedFlushedSequenceId < lastSequenceId) {
+ // update last flushed sequence id in the region level
+ ZKUtil.setData(this.watcher, nodePath,
+ ReplicationZookeeper.toByteArray(lastSequenceId));
+ }
+ }
+ // go one level deeper with server name
+ nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
+ if (lastSequenceId <= lastRecordedFlushedSequenceId) {
+ // the newly assigned RS failed even before any flush to the region
+ lastSequenceId = -1;
+ }
+ ZKUtil.createSetData(this.watcher, nodePath,
+ ReplicationZookeeper.toByteArray(lastSequenceId));
+
+ // break retry loop
+ break;
+ } catch (KeeperException e) {
+ // ignore ZooKeeper exceptions inside retry loop
+ if (retries <= 1) {
+ throw e;
+ }
+ // wait a little bit for retry
+ try {
+ Thread.sleep(20);
+ } catch (Exception ignoreE) {
+ // ignore
+ }
+ }
+ } while ((--retries) > 0 && (!this.stopper.isStopped()));
+ }
+ }
+
+ /**
+ * This function is to guard the situation when there is a job being submitted but not processed
+ * yet when we starts to open recovering regions. Otherwise, we could prematurely open regions are
+ * just marked as recovering.
+ */
+ public void prepareDistributedLogReplay(Set serverNames) {
+ if (this.distributedLogReplay && serverNames != null) {
+ // only use the reference count in WALEdits replay mode.
+ try {
+ this.recoveringRegionLock.lock();
+ this.inflightWorkItems.addAll(serverNames);
+ } finally {
+ this.recoveringRegionLock.unlock();
+ }
+ }
+ }
+
+ public void clearLogReplayInFlightWorkItems(Set serverNames) {
+ if (this.distributedLogReplay && serverNames != null) {
+ this.inflightWorkItems.removeAll(serverNames);
+ }
+ }
+
+ /**
+ * @param bytes - Content of a failed region server or recovering region znode.
+ * @return long - The last flushed sequence Id for the region server
+ */
+ public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
+ long lastRecordedFlushedSequenceId = -1l;
+ try {
+ lastRecordedFlushedSequenceId = ReplicationZookeeper.parseHLogPositionFrom(bytes);
+ } catch (DeserializationException e) {
+ lastRecordedFlushedSequenceId = -1l;
+ LOG.warn("Can't parse last flushed sequence Id", e);
+ }
+ return lastRecordedFlushedSequenceId;
+ }
+
+ /**
+ * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
+ * Clients threads use this object to wait for all their tasks to be done.
*
* All access is synchronized.
*/
@@ -945,18 +1168,14 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.info("dead splitlog worker " + workerName);
}
- void handleDeadWorkers(List serverNames) {
- List workerNames = new ArrayList(serverNames.size());
- for (ServerName serverName : serverNames) {
- workerNames.add(serverName);
- }
+ void handleDeadWorkers(Set serverNames) {
synchronized (deadWorkersLock) {
if (deadWorkers == null) {
deadWorkers = new HashSet(100);
}
- deadWorkers.addAll(workerNames);
+ deadWorkers.addAll(serverNames);
}
- LOG.info("dead splitlog workers " + workerNames);
+ LOG.info("dead splitlog workers " + serverNames);
}
/**
@@ -1053,6 +1272,11 @@ public class SplitLogManager extends ZooKeeperListener {
}
failedDeletions.removeAll(tmpPaths);
}
+
+ // Garbage collect left-over /hbase/recovering-regions/... znode
+ if (tot == 0 && inflightWorkItems.size() == 0 && tasks.size() == 0) {
+ removeRecoveringRegionsFromZK(null);
+ }
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
index 2956047..7070e7d 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.zookeeper.KeeperException;
@@ -47,30 +50,53 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
@Override
public void process() throws IOException {
boolean gotException = true;
- try{
- try {
- LOG.info("Splitting META logs for " + serverName);
- if (this.shouldSplitHlog) {
+ try {
+ LOG.info("Splitting META logs for " + serverName);
+ AssignmentManager am = this.services.getAssignmentManager();
+ if (this.shouldSplitHlog) {
+ if(this.distributedLogReplay) {
+ Set regions = new HashSet();
+ if (isCarryingMeta()) {
+ regions.add(HRegionInfo.FIRST_META_REGIONINFO);
+ }
+ this.services.getMasterFileSystem().prepareMetaLogReplay(serverName, regions);
+ } else {
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
- } catch (IOException ioe) {
- this.services.getExecutorService().submit(this);
- this.deadServers.add(serverName);
- throw new IOException("failed log splitting for " +
- serverName + ", will retry", ioe);
}
// Assign meta if we were carrying it.
// Check again: region may be assigned to other where because of RIT
// timeout
- if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
+ if (am.isCarryingMeta(serverName)) {
LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
- this.services.getAssignmentManager().regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
+ am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
verifyAndAssignMetaWithRetries();
} else {
LOG.info("META has been assigned to otherwhere, skip assigning.");
}
-
+
+ try {
+ if (this.shouldSplitHlog && this.distributedLogReplay) {
+ if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
+ regionAssignmentWaitTimeout)) {
+ throw new IOException("Region " + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()
+ + " didn't complete assignment in time");
+ }
+ this.services.getMasterFileSystem().splitMetaLog(serverName);
+ }
+ } catch (Exception ex) {
+ if (ex instanceof IOException) {
+ // typecast to SSH so that we make sure that it is the SSH instance that
+ // gets submitted as opposed to MSSH or some other derived instance of SSH
+ this.services.getExecutorService().submit((ServerShutdownHandler) this);
+ this.deadServers.add(serverName);
+ throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
+ } else {
+ throw new IOException(ex);
+ }
+ }
+
gotException = false;
} finally {
if (gotException){
@@ -78,6 +104,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
this.deadServers.finish(serverName);
}
}
+
super.process();
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index 2705d90..b0979f1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -56,6 +59,8 @@ public class ServerShutdownHandler extends EventHandler {
protected final MasterServices services;
protected final DeadServer deadServers;
protected final boolean shouldSplitHlog; // whether to split HLog or not
+ protected final boolean distributedLogReplay;
+ protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
@@ -76,6 +81,10 @@ public class ServerShutdownHandler extends EventHandler {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitHlog = shouldSplitHlog;
+ this.distributedLogReplay =
+ server.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
+ HConstants.REGION_ASSIGNMENT_TIME_OUT, 15000);
}
@Override
@@ -107,21 +116,7 @@ public class ServerShutdownHandler extends EventHandler {
public void process() throws IOException {
final ServerName serverName = this.serverName;
try {
- try {
- if (this.shouldSplitHlog) {
- LOG.info("Splitting logs for " + serverName);
- this.services.getMasterFileSystem().splitLog(serverName);
- } else {
- LOG.info("Skipping log splitting for " + serverName);
- }
- } catch (IOException ioe) {
- //typecast to SSH so that we make sure that it is the SSH instance that
- //gets submitted as opposed to MSSH or some other derived instance of SSH
- this.services.getExecutorService().submit((ServerShutdownHandler)this);
- this.deadServers.add(serverName);
- throw new IOException("failed log splitting for " +
- serverName + ", will retry", ioe);
- }
+
// We don't want worker thread in the MetaServerShutdownHandler
// executor pool to block by waiting availability of .META.
// Otherwise, it could run into the following issue:
@@ -145,7 +140,7 @@ public class ServerShutdownHandler extends EventHandler {
// the dead server for further processing too.
if (isCarryingMeta() // .META.
|| !services.getAssignmentManager().isFailoverCleanupDone()) {
- this.services.getServerManager().processDeadServer(serverName);
+ this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);
return;
}
@@ -183,6 +178,27 @@ public class ServerShutdownHandler extends EventHandler {
throw new IOException("Server is stopped");
}
+ try {
+ if (this.shouldSplitHlog) {
+ LOG.info("Splitting logs for " + serverName + " before assignment.");
+ if(this.distributedLogReplay){
+ Set serverNames = new HashSet();
+ serverNames.add(serverName);
+ this.services.getMasterFileSystem().prepareDistributedLogReplay(serverNames);
+ } else {
+ this.services.getMasterFileSystem().splitLog(serverName);
+ }
+ } else {
+ LOG.info("Skipping log splitting for " + serverName);
+ }
+ } catch (IOException ioe) {
+ // typecast to SSH so that we make sure that it is the SSH instance that
+ // gets submitted as opposed to MSSH or some other derived instance of SSH
+ this.services.getExecutorService().submit((ServerShutdownHandler) this);
+ this.deadServers.add(serverName);
+ throw new IOException("failed log splitting for " + serverName + ", will retry", ioe);
+ }
+
// Clean out anything in regions in transition. Being conservative and
// doing after log splitting. Could do some states before -- OPENING?
// OFFLINE? -- and then others after like CLOSING that depend on log
@@ -258,15 +274,40 @@ public class ServerShutdownHandler extends EventHandler {
}
}
}
+
try {
am.assign(toAssignRegions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);
}
+
+ try {
+ if (this.shouldSplitHlog && this.distributedLogReplay) {
+ // wait for region assignment completes
+ for (HRegionInfo hri : toAssignRegions) {
+ if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {
+ throw new IOException("Region " + hri.getEncodedName()
+ + " didn't complete assignment in time");
+ }
+ }
+ this.services.getMasterFileSystem().splitLog(serverName);
+ }
+ } catch (Exception ex) {
+ if (ex instanceof IOException) {
+ // typecast to SSH so that we make sure that it is the SSH instance that
+ // gets submitted as opposed to MSSH or some other derived instance of SSH
+ this.services.getExecutorService().submit((ServerShutdownHandler) this);
+ this.deadServers.add(serverName);
+ throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
+ } else {
+ throw new IOException(ex);
+ }
+ }
} finally {
this.deadServers.finish(serverName);
}
+
LOG.info("Finished processing of shutdown of " + serverName);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b70143d..19a182f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
+import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
@@ -206,6 +208,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
@@ -258,6 +261,10 @@ public class HRegionServer implements ClientProtocol,
// catalog tracker
protected CatalogTracker catalogTracker;
+ // Watch if a region is out of recovering state from ZooKeeper
+ @SuppressWarnings("unused")
+ private RecoveringRegionWatcher recoveringRegionWatcher;
+
/**
* Go here to get table descriptors.
*/
@@ -280,6 +287,13 @@ public class HRegionServer implements ClientProtocol,
protected final Map onlineRegions =
new ConcurrentHashMap();
+ /**
+ * Set of regions currently being in recovering state which means it can accept writes(edits from
+ * previous failed region server) but not reads. A recovering region is also an online region.
+ */
+ protected final Set recoveringRegions = Collections
+ .synchronizedSet(new HashSet());
+
// Leases
protected Leases leases;
@@ -440,6 +454,9 @@ public class HRegionServer implements ClientProtocol,
/** Handle all the snapshot requests to this server */
RegionServerSnapshotManager snapshotManager;
+
+ // configuration setting on if replay WAL edits directly to another RS
+ private final boolean distributedLogReplay;
// Table level lock manager for locking for region operations
private TableLockManager tableLockManager;
@@ -534,6 +551,8 @@ public class HRegionServer implements ClientProtocol,
}
};
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+
+ this.distributedLogReplay = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
}
/**
@@ -644,6 +663,9 @@ public class HRegionServer implements ClientProtocol,
}
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
new ServerName(isa.getHostName(), isa.getPort(), startcode));
+
+ // register watcher for recovering regions
+ this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
}
/**
@@ -1448,8 +1470,7 @@ public class HRegionServer implements ClientProtocol,
this.rpcServer.start();
// Create the log splitting worker and start it
- this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
- this.getConfiguration(), this.getServerName(), this);
+ this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
splitLogWorker.start();
}
@@ -1810,17 +1831,43 @@ public class HRegionServer implements ClientProtocol,
}
@Override
- public long getLastSequenceId(byte[] region) {
+ public long getLastSequenceId(String regionServerName, byte[] region) {
Long lastFlushedSequenceId = -1l;
- try {
- GetLastFlushedSequenceIdRequest req =
- RequestConverter.buildGetLastFlushedSequenceIdRequest(region);
- lastFlushedSequenceId = hbaseMaster.getLastFlushedSequenceId(null, req)
- .getLastFlushedSequenceId();
- } catch (ServiceException e) {
- lastFlushedSequenceId = -1l;
- LOG.warn("Unable to connect to the master to check " +
- "the last flushed sequence id", e);
+
+ if (!this.distributedLogReplay) {
+ try {
+ GetLastFlushedSequenceIdRequest req = RequestConverter
+ .buildGetLastFlushedSequenceIdRequest(region);
+ lastFlushedSequenceId = hbaseMaster.getLastFlushedSequenceId(null, req)
+ .getLastFlushedSequenceId();
+ } catch (ServiceException e) {
+ lastFlushedSequenceId = -1l;
+ LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
+ }
+ } else {
+ if (regionServerName.isEmpty()) {
+ return lastFlushedSequenceId;
+ }
+ // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
+ // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
+ // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
+ // sequence Id name space
+ // (sequence Id only valid for a particular RS instance), changes when different newly
+ // assigned RS flushes the region.
+ // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
+ // last flushed sequence Id for each failed RS instance.
+ String encodedRegionName = Bytes.toString(region);
+ String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, encodedRegionName);
+ nodePath = ZKUtil.joinZNode(nodePath, regionServerName);
+ try {
+ byte[] data = ZKUtil.getData(getZooKeeper(), nodePath);
+ if (data != null) {
+ lastFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Cannot get lastFlushedSequenceId from ZooKeeper for server=" + regionServerName
+ + "; region=" + encodedRegionName, e);
+ }
}
return lastFlushedSequenceId;
}
@@ -1894,6 +1941,10 @@ public class HRegionServer implements ClientProtocol,
return this.stopping;
}
+ public Set getRecoveringRegions() {
+ return this.recoveringRegions;
+ }
+
/**
*
* @return the configuration
@@ -2575,10 +2626,15 @@ public class HRegionServer implements ClientProtocol,
try {
requestCount.increment();
HRegion region = getRegion(request.getRegion());
+
+ // check if current region is in recovering phase
+ checkRegionIsInRecovering(region);
+
GetResponse.Builder builder = GetResponse.newBuilder();
ClientProtos.Get get = request.getGet();
Boolean existence = null;
Result r = null;
+
if (request.getClosestRowBefore()) {
if (get.getColumnCount() != 1) {
throw new DoNotRetryIOException(
@@ -2924,6 +2980,9 @@ public class HRegionServer implements ClientProtocol,
}
if (!done) {
+ // check if current region is in recovering phase
+ this.checkRegionIsInRecovering(region);
+
long maxResultSize = scanner.getMaxResultSize();
if (maxResultSize <= 0) {
maxResultSize = maxScannerResultSize;
@@ -3375,6 +3434,10 @@ public class HRegionServer implements ClientProtocol,
removeFromMovedRegions(region.getEncodedName());
if (previous == null) {
+ // check if the region to be opened is marked in recovering state in ZK
+ if (isRegionMarkedRecoveringInZK(region.getEncodedName())) {
+ this.recoveringRegions.add(region.getEncodedName());
+ }
// If there is no action in progress, we can submit a specific handler.
// Need to pass the expected version in the constructor.
if (region.isMetaRegion()) {
@@ -3388,6 +3451,9 @@ public class HRegionServer implements ClientProtocol,
builder.addOpeningState(RegionOpeningState.OPENED);
+ } catch (KeeperException zooKeeperEx) {
+ LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
+ throw new ServiceException(zooKeeperEx);
} catch (IOException ie) {
LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
if (isBulkAssign) {
@@ -3998,4 +4064,37 @@ public class HRegionServer implements ClientProtocol,
public CompactSplitThread getCompactSplitThread() {
return this.compactSplitThread;
}
+
+ /**
+ * check if /hbase/recovering-regions/ exists. Returns true if exists
+ * and set watcher as well.
+ * @param regionEncodedName region encode name
+ * @return true when /hbase/recovering-regions/ exists
+ * @throws KeeperException
+ */
+ private boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException {
+ boolean result = false;
+ String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, regionEncodedName);
+
+ byte[] node = ZKUtil.getDataAndWatch(this.zooKeeper, nodePath);
+ if (node != null) {
+ result = true;
+ }
+
+ return result;
+ }
+
+ /**
+ * Throws RegionInRecoveryException when a region is in recoverying state to reject read requests
+ * @param region
+ * @throws RegionInRecoveryException
+ */
+ private void checkRegionIsInRecovering(HRegion region) throws RegionInRecoveryException {
+ // check if current region is in recovering phase
+ if (!this.recoveringRegions.isEmpty()
+ && this.recoveringRegions.contains(region.getRegionInfo().getEncodedName())) {
+ throw new RegionInRecoveryException(region.getRegionInfo().getRegionNameAsString()
+ + " is recovering");
+ }
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
index 9c5aac6..9fc3b1b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
@@ -26,8 +26,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface LastSequenceId {
/**
- * @param regionname
- * @return Last flushed sequence Id for regionname
+ * @param regionServerName Server name which severed the region pointed by regionName before
+ * @param regionName Encoded region name
+ * @return Last flushed sequence Id for regionName or -1 if it can't be determined
*/
- public long getLastSequenceId(byte[] regionname);
+ public long getLastSequenceId(String regionServerName, byte[] regionName);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 8ebfd1a..49bf4fa 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -29,10 +31,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -70,6 +72,7 @@ import org.apache.zookeeper.data.Stat;
@InterfaceAudience.Private
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
+ private static final int checkInterval = 10000; // 10 seconds
Thread worker;
private final ServerName serverName;
@@ -83,20 +86,30 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
private final int report_period;
+ private HRegionServer server = null;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
- ServerName serverName, TaskExecutor splitTaskExecutor) {
+ HRegionServer server, TaskExecutor splitTaskExecutor) {
+ super(watcher);
+ this.server = server;
+ this.serverName = server.getServerName();
+ this.splitTaskExecutor = splitTaskExecutor;
+ report_period = conf.getInt("hbase.splitlog.report.period",
+ conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
+ }
+
+ public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
+ TaskExecutor splitTaskExecutor) {
super(watcher);
this.serverName = serverName;
this.splitTaskExecutor = splitTaskExecutor;
report_period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout",
- SplitLogManager.DEFAULT_TIMEOUT) / 2);
+ conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
}
- public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
- final ServerName serverName, final LastSequenceId sequenceIdChecker) {
- this(watcher, conf, serverName, new TaskExecutor () {
+ public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, HRegionServer server,
+ final LastSequenceId sequenceIdChecker) {
+ this(watcher, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, CancelableProgressable p) {
Path rootdir;
@@ -204,7 +217,36 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) {
try {
- taskReadyLock.wait();
+ taskReadyLock.wait(checkInterval);
+ if (paths.isEmpty() && this.server != null) {
+ // check to see if we have stale recovering regions in our internal memory state
+ Set recoveringRegions = this.server.getRecoveringRegions();
+ if (!recoveringRegions.isEmpty()) {
+ // Make a local copy to prevent ConcurrentModificationException when other threads
+ // modify recoveringRegions
+ List tmpCopy = new ArrayList(recoveringRegions);
+ for (String region : tmpCopy) {
+ String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
+ try {
+ if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
+ recoveringRegions.remove(region);
+ LOG.debug("Mark recovering region:" + region + " up.");
+ } else {
+ // current check is a defensive(or redundant) mechanism to prevent us from
+ // having stale recovering regions in our internal RS memory state while
+ // zookeeper(source of truth) says differently. We stop at the first good one
+ // because we should not have a single instance such as this in normal case so
+ // check the first one is good enough.
+ break;
+ }
+ } catch (KeeperException e) {
+ // ignore zookeeper error
+ LOG.debug("Got a zookeeper when trying to open a recovering region", e);
+ break;
+ }
+ }
+ }
+ }
} catch (InterruptedException e) {
LOG.info("SplitLogWorker interrupted while waiting for task," +
" exiting: " + e.toString() + (exitWorker ? "" :
@@ -214,6 +256,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
}
}
+
}
}
@@ -463,9 +506,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
}
-
-
-
@Override
public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time attemptToOwnTask()
@@ -510,7 +550,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
return childrenPaths;
}
-
@Override
public void nodeChildrenChanged(String path) {
if(path.equals(watcher.splitLogZNode)) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 032e2cf..3f6dbb3 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -34,10 +34,13 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -51,8 +54,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -115,6 +125,20 @@ public class HLogSplitter {
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
+ final boolean distributedLogReplay;
+
+ /**
+ * used to construct HTable instance which is replaying WAL edits to assigned region servers.
+ * Regions of those WAL edits will be in recovering mode and can't accept reads.
+ */
+ private final ExecutorService threadPoolForLogReplay;
+
+ // Number of writer threads
+ private final int numWriterThreads;
+
+ // Min batch size when replay WAL edits
+ private final int minBatchSize;
+
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
* hbase.hlog.splitter.impl property to derived the instance
@@ -171,7 +195,24 @@ public class HLogSplitter {
entryBuffers = new EntryBuffers(
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
- outputSink = new OutputSink();
+
+ this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
+ this.distributedLogReplay = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+
+ if (this.distributedLogReplay) {
+ this.numWriterThreads = conf.getInt("hbase.regionserver.wal.logreplay.writer.threads", 3);
+ this.threadPoolForLogReplay = new ThreadPoolExecutor(this.numWriterThreads, conf.getInt(
+ "hbase.htable.threads.max", Integer.MAX_VALUE), conf.getLong(
+ "hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
+ new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-recover"));
+ ((ThreadPoolExecutor) this.threadPoolForLogReplay).allowCoreThreadTimeOut(true);
+
+ outputSink = new LogReplayOutputSink(numWriterThreads, this.threadPoolForLogReplay);
+ } else {
+ this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ outputSink = new LogSplittingOutputSink(numWriterThreads);
+ this.threadPoolForLogReplay = null;
+ }
}
/**
@@ -260,26 +301,26 @@ public class HLogSplitter {
}
/**
- * Splits the HLog edits in the given list of logfiles (that are a mix of edits
- * on multiple regions) by region and then splits them per region directories,
- * in batches of (hbase.hlog.split.batch.size)
+ * Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
+ * multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
+ * per region directories, in batches.
*
- * This process is split into multiple threads. In the main thread, we loop
- * through the logs to be split. For each log, we:
+ * This process is split into multiple threads. In the main thread, we loop through the logs to be
+ * split. For each log, we:
*
- * - Recover it (take and drop HDFS lease) to ensure no other process can write
- * - Read each edit (see {@link #parseHLog}
- * - Mark as "processed" or "corrupt" depending on outcome
+ * - Recover it (take and drop HDFS lease) to ensure no other process can write
+ * - Read each edit (see {@link #parseHLog}
+ * - Mark as "processed" or "corrupt" depending on outcome
*
*
- * Each edit is passed into the EntryBuffers instance, which takes care of
- * memory accounting and splitting the edits by region.
+ * Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
+ * splitting the edits by region.
*
- * The OutputSink object then manages N other WriterThreads which pull chunks
- * of edits from EntryBuffers and write them to the output region directories.
+ * The OutputSink object then manages N other WriterThreads which pull chunks of edits from
+ * EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
+ * region servers directly
*
- * After the process is complete, the log files are archived to a separate
- * directory.
+ * After the process is complete, the log files are archived to a separate directory.
*/
private List splitLog(final FileStatus[] logfiles, CountDownLatch latch)
throws IOException {
@@ -452,18 +493,19 @@ public class HLogSplitter {
outputSinkStarted = true;
// Report progress every so many edits and/or files opened (opening a file
// takes a bit of time).
- Map lastFlushedSequenceIds =
- new TreeMap(Bytes.BYTES_COMPARATOR);
+ Map lastFlushedSequenceIds = new TreeMap();
Entry entry;
-
- while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+ Long lastFlushedSequenceId = -1L;
+ ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
+ String serverNameStr = (serverName == null) ? "" : serverName.getServerName();
+ while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
- Long lastFlushedSequenceId = -1l;
+ String key = Bytes.toString(region);
if (sequenceIdChecker != null) {
- lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
+ lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
if (lastFlushedSequenceId == null) {
- lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
- lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+ lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(serverNameStr, region);
+ lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
}
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -472,12 +514,13 @@ public class HLogSplitter {
}
entryBuffers.appendEntry(entry);
editsCount++;
+ int moreWritersFromLastCheck = outputSink.getNumOpenWriters() - numOpenedFilesLastCheck;
// If sufficient edits have passed, check if we should report progress.
if (editsCount % interval == 0
- || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
- numOpenedFilesLastCheck = outputSink.logWriters.size();
- String countsStr = (editsCount - editsSkipped) +
- " edits, skipped " + editsSkipped + " edits.";
+ || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
+ numOpenedFilesLastCheck = outputSink.getNumOpenWriters();
+ String countsStr = (editsCount - editsSkipped) + " edits, skipped " + editsSkipped
+ + " edits.";
status.setStatus("Split " + countsStr);
if (reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -502,9 +545,8 @@ public class HLogSplitter {
progress_failed = outputSink.finishWritingAndClose() == null;
}
String msg = "Processed " + editsCount + " edits across "
- + outputSink.getOutputCounts().size() + " regions; log file="
- + logPath + " is corrupted = " + isCorrupted + " progress failed = "
- + progress_failed;
+ + outputSink.getRecoveredRegions().size() + " regions; log file=" + logPath
+ + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
LOG.info(msg);
status.markComplete(msg);
}
@@ -880,20 +922,33 @@ public class HLogSplitter {
totalBuffered += incrHeap;
while (totalBuffered > maxHeapUsage && thrown.get() == null) {
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
- dataAvailable.wait(3000);
+ dataAvailable.wait(2000);
}
dataAvailable.notifyAll();
}
checkForErrors();
}
+ /*
+ * Backward compatible with existing code
+ */
synchronized RegionEntryBuffer getChunkToWrite() {
- long biggestSize=0;
- byte[] biggestBufferKey=null;
+ return getChunkToWrite(false);
+ }
+
+ /**
+ * @param ignoreCurrentlyWritingCheck when true it allows to write edits from same region in
+ * multiple threads. It's set as true when we're in WAL edits directly replay mode.
+ * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+ */
+ synchronized RegionEntryBuffer getChunkToWrite(boolean ignoreCurrentlyWritingCheck) {
+ long biggestSize = 0;
+ byte[] biggestBufferKey = null;
for (Map.Entry entry : buffers.entrySet()) {
long size = entry.getValue().heapSize();
- if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
+ if (size > biggestSize
+ && (ignoreCurrentlyWritingCheck || !currentlyWriting.contains(entry.getKey()))) {
biggestSize = size;
biggestBufferKey = entry.getKey();
}
@@ -909,8 +964,7 @@ public class HLogSplitter {
void doneWriting(RegionEntryBuffer buffer) {
synchronized (this) {
- boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
- assert removed;
+ currentlyWriting.remove(buffer.encodedRegionName);
}
long size = buffer.heapSize();
@@ -985,13 +1039,15 @@ public class HLogSplitter {
private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting");
while (true) {
- RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+ RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(distributedLogReplay);
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (dataAvailable) {
- if (shouldStop) return;
+ if (shouldStop && !outputSink.flush()) {
+ return;
+ }
try {
- dataAvailable.wait(1000);
+ dataAvailable.wait(500);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
@@ -1012,39 +1068,7 @@ public class HLogSplitter {
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
- List entries = buffer.entryBuffer;
- if (entries.isEmpty()) {
- LOG.warn(this.getName() + " got an empty buffer, skipping");
- return;
- }
-
- WriterAndPath wap = null;
-
- long startTime = System.nanoTime();
- try {
- int editsCount = 0;
-
- for (Entry logEntry : entries) {
- if (wap == null) {
- wap = outputSink.getWriterAndPath(logEntry);
- if (wap == null) {
- // getWriterAndPath decided we don't need to write these edits
- // Message was already logged
- return;
- }
- }
- wap.w.append(logEntry);
- outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
- editsCount++;
- }
- // Pass along summary statistics
- wap.incrementEdits(editsCount);
- wap.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.fatal(this.getName() + " Got while writing log entry to log", e);
- throw e;
- }
+ outputSink.append(buffer);
}
void finish() {
@@ -1055,28 +1079,6 @@ public class HLogSplitter {
}
}
- private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
- FileSystem fs, Configuration conf)
- throws IOException {
- Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
- if (regionedits == null) {
- return null;
- }
- if (fs.exists(regionedits)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + regionedits + ", length="
- + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
- }
- }
- Writer w = createWriter(fs, regionedits, conf);
- LOG.debug("Creating writer path=" + regionedits + " region="
- + Bytes.toStringBinary(region));
- return (new WriterAndPath(regionedits, w));
- }
-
Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
List components = new ArrayList(10);
do {
@@ -1109,35 +1111,33 @@ public class HLogSplitter {
}
/**
- * Class that manages the output streams from the log splitting process.
+ * The following class is an abstraction class to provide a common interface to support both
+ * existing recovered edits file sink and region server WAL edits replay sink
*/
- class OutputSink {
- private final Map logWriters = Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
- private final Map regionMaximumEditLogSeqNum = Collections
+ abstract class OutputSink {
+
+ protected Map writers = Collections
+ .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));;
+
+ protected final Map regionMaximumEditLogSeqNum = Collections
.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
- private final List writerThreads = Lists.newArrayList();
+
+ protected final List writerThreads = Lists.newArrayList();
/* Set of regions which we've decided should not output edits */
- private final Set blacklistedRegions = Collections.synchronizedSet(
- new TreeSet(Bytes.BYTES_COMPARATOR));
+ protected final Set blacklistedRegions = Collections
+ .synchronizedSet(new TreeSet(Bytes.BYTES_COMPARATOR));
- private boolean closeAndCleanCompleted = false;
+ protected boolean closeAndCleanCompleted = false;
- private boolean logWritersClosed = false;
+ protected boolean writersClosed = false;
- private final int numThreads;
+ protected final int numThreads;
- private CancelableProgressable reporter = null;
+ protected CancelableProgressable reporter = null;
- public OutputSink() {
- // More threads could potentially write faster at the expense
- // of causing more disk seeks as the logs are split.
- // 3. After a certain setting (probably around 3) the
- // process will be bound on the reader in the current
- // implementation anyway.
- numThreads = conf.getInt(
- "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ public OutputSink(int numWriters) {
+ numThreads = numWriters;
}
void setReporter(CancelableProgressable reporter) {
@@ -1145,8 +1145,7 @@ public class HLogSplitter {
}
/**
- * Start the threads that will pump data from the entryBuffers
- * to the output files.
+ * Start the threads that will pump data from the entryBuffers to the output files.
*/
synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) {
@@ -1158,66 +1157,143 @@ public class HLogSplitter {
/**
*
- * @return null if failed to report progress
+ * Update region's maximum edit log SeqNum.
+ */
+ void updateRegionMaximumEditLogSeqNum(Entry entry) {
+ synchronized (regionMaximumEditLogSeqNum) {
+ Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
+ .getEncodedRegionName());
+ if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
+ regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
+ .getLogSeqNum());
+ }
+ }
+ }
+
+ Long getRegionMaximumEditLogSeqNum(byte[] region) {
+ return regionMaximumEditLogSeqNum.get(region);
+ }
+
+ /**
+ * @return the number of currently opened writers
+ */
+ int getNumOpenWriters() {
+ return this.writers.size();
+ }
+
+ /**
+ * Wait for writer threads to dump all info to the sink
+ * @return true when there is no error
* @throws IOException
*/
- List finishWritingAndClose() throws IOException {
+ protected boolean finishWriting() throws IOException {
LOG.info("Waiting for split writer threads to finish");
boolean progress_failed = false;
- try {
- for (WriterThread t : writerThreads) {
- t.finish();
- }
- for (WriterThread t : writerThreads) {
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- try {
- t.join();
- } catch (InterruptedException ie) {
- IOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
- checkForErrors();
+ for (WriterThread t : writerThreads) {
+ t.finish();
+ }
+ for (WriterThread t : writerThreads) {
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
}
- LOG.info("Split writers finished");
- if (progress_failed) {
- return null;
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
}
- return closeStreams();
+ checkForErrors();
+ }
+ LOG.info("Split writers finished");
+ return (!progress_failed);
+ }
+
+ abstract List finishWritingAndClose() throws IOException;
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ abstract Map getOutputCounts();
+
+ /**
+ * @return a list of regions we've recovered
+ */
+ abstract Set getRecoveredRegions();
+
+ /**
+ * @param entry A WAL Edit Entry
+ * @throws IOException
+ */
+ abstract void append(RegionEntryBuffer buffer) throws IOException;
+
+ /**
+ * WriterThread call this function to help flush internal remaining edits in buffer before close
+ * @return true when underlying sink has something to flush
+ */
+ protected boolean flush() throws IOException {
+ return false;
+ }
+ }
+
+ /**
+ * Class that manages the output streams from the log splitting process.
+ */
+ class LogSplittingOutputSink extends OutputSink {
+
+ public LogSplittingOutputSink(int numWriters) {
+ // More threads could potentially write faster at the expense
+ // of causing more disk seeks as the logs are split.
+ // 3. After a certain setting (probably around 3) the
+ // process will be bound on the reader in the current
+ // implementation anyway.
+ super(numWriters);
+ }
+
+ /**
+ * @return null if failed to report progress
+ * @throws IOException
+ */
+ @Override
+ List finishWritingAndClose() throws IOException {
+ boolean isSuccessful = false;
+ List result = null;
+ try {
+ isSuccessful = finishWriting();
} finally {
+ result = close();
List thrown = closeLogWriters(null);
if (thrown != null && !thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
}
+ return (isSuccessful) ? result : null;
}
/**
* Close all of the output streams.
* @return the list of paths written.
*/
- private List closeStreams() throws IOException {
+ private List close() throws IOException {
Preconditions.checkState(!closeAndCleanCompleted);
final List paths = new ArrayList();
final List thrown = Lists.newArrayList();
- ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(
- numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
- private int count = 1;
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "split-log-closeStream-" + count++);
- return t;
- }
- });
+ ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
+ TimeUnit.SECONDS, new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "split-log-closeStream-" + count++);
+ return t;
+ }
+ });
CompletionService completionService = new ExecutorCompletionService(
closeThreadPool);
- for (final Map.Entry logWritersEntry : logWriters
- .entrySet()) {
+ for (final Map.Entry writersEntry : writers.entrySet()) {
completionService.submit(new Callable() {
public Void call() throws Exception {
- WriterAndPath wap = logWritersEntry.getValue();
+ WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
try {
wap.w.close();
} catch (IOException ioe) {
@@ -1225,15 +1301,25 @@ public class HLogSplitter {
thrown.add(ioe);
return null;
}
- LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
- + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms)");
+
+ if (wap.editsWritten == 0) {
+ // just remove the empty recovered.edits file
+ if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+ LOG.warn("Failed deleting empty " + wap.p);
+ throw new IOException("Failed deleting empty " + wap.p);
+ }
+ return null;
+ }
+
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
- regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
+ regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
try {
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + dst + ", length=" + fs.getFileStatus(dst).getLen());
+ + "result of a previous failed split attempt. Deleting " + dst + ", length="
+ + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
@@ -1244,8 +1330,7 @@ public class HLogSplitter {
// TestHLogSplit#testThreading is an example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to "
- + dst);
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
}
@@ -1262,7 +1347,7 @@ public class HLogSplitter {
boolean progress_failed = false;
try {
- for (int i = 0, n = logWriters.size(); i < n; i++) {
+ for (int i = 0, n = this.writers.size(); i < n; i++) {
Future future = completionService.take();
future.get();
if (!progress_failed && reporter != null && !reporter.progress()) {
@@ -1282,7 +1367,7 @@ public class HLogSplitter {
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
- logWritersClosed = true;
+ writersClosed = true;
closeAndCleanCompleted = true;
if (progress_failed) {
return null;
@@ -1290,9 +1375,8 @@ public class HLogSplitter {
return paths;
}
- private List closeLogWriters(List thrown)
- throws IOException {
- if (!logWritersClosed) {
+ private List closeLogWriters(List thrown) throws IOException {
+ if (!writersClosed) {
if (thrown == null) {
thrown = Lists.newArrayList();
}
@@ -1311,36 +1395,35 @@ public class HLogSplitter {
}
}
} finally {
- synchronized (logWriters) {
- for (WriterAndPath wap : logWriters.values()) {
+ synchronized (writers) {
+ WriterAndPath wap = null;
+ for (SinkWriter tmpWAP : writers.values()) {
try {
+ wap = (WriterAndPath) tmpWAP;
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
thrown.add(ioe);
continue;
}
- LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
- + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms)");
}
}
- logWritersClosed = true;
+ writersClosed = true;
}
}
return thrown;
}
/**
- * Get a writer and path for a log starting at the given entry.
- *
- * This function is threadsafe so long as multiple threads are always
- * acting on different regions.
- *
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
* @return null if this region shouldn't output any logs
*/
- WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+ private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
byte region[] = entry.getKey().getEncodedRegionName();
- WriterAndPath ret = logWriters.get(region);
+ WriterAndPath ret = (WriterAndPath) writers.get(region);
if (ret != null) {
return ret;
}
@@ -1354,75 +1437,450 @@ public class HLogSplitter {
blacklistedRegions.add(region);
return null;
}
- logWriters.put(region, ret);
+ writers.put(region, ret);
return ret;
}
- /**
- * Update region's maximum edit log SeqNum.
- */
- void updateRegionMaximumEditLogSeqNum(Entry entry) {
- synchronized (regionMaximumEditLogSeqNum) {
- Long currentMaxSeqNum=regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName());
- if (currentMaxSeqNum == null
- || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
- regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
- entry.getKey().getLogSeqNum());
+ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
+ Configuration conf) throws IOException {
+ Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+ if (regionedits == null) {
+ return null;
+ }
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
}
}
-
+ Writer w = createWriter(fs, regionedits, conf);
+ LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
+ return (new WriterAndPath(regionedits, w));
}
- Long getRegionMaximumEditLogSeqNum(byte[] region) {
- return regionMaximumEditLogSeqNum.get(region);
+ void append(RegionEntryBuffer buffer) throws IOException {
+ List entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+
+ WriterAndPath wap = null;
+
+ long startTime = System.nanoTime();
+ try {
+ int editsCount = 0;
+
+ for (Entry logEntry : entries) {
+ if (wap == null) {
+ wap = getWriterAndPath(logEntry);
+ if (wap == null) {
+ // getWriterAndPath decided we don't need to write these edits
+ return;
+ }
+ }
+ wap.w.append(logEntry);
+ outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
+ editsCount++;
+ }
+ // Pass along summary statistics
+ wap.incrementEdits(editsCount);
+ wap.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(" Got while writing log entry to log", e);
+ throw e;
+ }
}
/**
- * @return a map from encoded region ID to the number of edits written out
- * for that region.
+ * @return a map from encoded region ID to the number of edits written out for that region.
*/
- private Map getOutputCounts() {
- TreeMap ret = new TreeMap(
- Bytes.BYTES_COMPARATOR);
- synchronized (logWriters) {
- for (Map.Entry entry : logWriters.entrySet()) {
+ Map getOutputCounts() {
+ TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR);
+ synchronized (writers) {
+ for (Map.Entry entry : writers.entrySet()) {
ret.put(entry.getKey(), entry.getValue().editsWritten);
}
}
return ret;
}
- }
- /**
- * Private data structure that wraps a Writer and its Path,
- * also collecting statistics about the data written to this
- * output.
- */
- private final static class WriterAndPath {
- final Path p;
- final Writer w;
+ Set getRecoveredRegions() {
+ return writers.keySet();
+ }
+ }
+ private abstract static class SinkWriter {
/* Count of edits written to this path */
long editsWritten = 0;
/* Number of nanos spent writing to this log */
long nanosSpent = 0;
+ void incrementEdits(int edits) {
+ editsWritten += edits;
+ }
+
+ void incrementNanoTime(long nanos) {
+ nanosSpent += nanos;
+ }
+ }
+
+ /**
+ * Private data structure that wraps a Writer and its Path, also collecting statistics about the
+ * data written to this output.
+ */
+ private final static class WriterAndPath extends SinkWriter {
+ final Path p;
+ final Writer w;
+
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
+ }
- void incrementEdits(int edits) {
- editsWritten += edits;
+ /**
+ * Class that manages to replay directly edits from WAL files to assigned fail over region servers
+ */
+ class LogReplayOutputSink extends OutputSink {
+ private static final double BUFFER_THRESHOLD = 0.35;
+ private static final String KEY_DELIMITER = "#";
+
+ private final Set recoveredRegions = Collections.synchronizedSet(new TreeSet(
+ Bytes.BYTES_COMPARATOR));
+ private final Map writers =
+ new ConcurrentHashMap();
+
+ private final ExecutorService sharedThreadPool;
+
+ private Map tableNameToHTableMap;
+ /**
+ * Map key -> value layout :
+ *
+ * -> Queue
+ */
+ private Map> serverToBufferQueueMap;
+ private List thrown;
+
+ public LogReplayOutputSink(int numWriters, ExecutorService pool) {
+ super(numWriters);
+ this.sharedThreadPool = pool;
+ this.tableNameToHTableMap = Collections.synchronizedMap(new TreeMap(
+ Bytes.BYTES_COMPARATOR));
+ this.serverToBufferQueueMap = new ConcurrentHashMap>();
+ this.thrown = new ArrayList();
}
- void incrementNanoTime(long nanos) {
- nanosSpent += nanos;
+ void append(RegionEntryBuffer buffer) throws IOException {
+ List entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+
+ // store regions we have recovered so far
+ if (!recoveredRegions.contains(buffer.encodedRegionName)) {
+ recoveredRegions.add(buffer.encodedRegionName);
+ }
+
+ // group entries by region servers
+ for (HLog.Entry entry : entries) {
+ WALEdit edit = entry.getEdit();
+ byte[] table = entry.getKey().getTablename();
+ HTable htable = this.getHTable(table);
+
+ Put put = null;
+ Delete del = null;
+ KeyValue lastKV = null;
+ HRegionLocation loc = null;
+ Row preRow = null;
+ Row lastAddedRow = null; // it is not really needed here just be conservative
+ String preKey = null;
+ List kvs = edit.getKeyValues();
+
+ for (KeyValue kv : kvs) {
+ if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+ if (preRow != null) {
+ synchronized (serverToBufferQueueMap) {
+ List queue = serverToBufferQueueMap.get(preKey);
+ if (queue == null) {
+ queue = Collections.synchronizedList(new ArrayList());
+ serverToBufferQueueMap.put(preKey, queue);
+ }
+ queue.add(preRow);
+ lastAddedRow = preRow;
+ }
+ }
+ loc = htable.getRegionLocation(kv.getRow(), false);
+ if (kv.isDelete()) {
+ del = new Delete(kv.getRow());
+ del.setClusterId(entry.getKey().getClusterId());
+ preRow = del;
+ } else {
+ put = new Put(kv.getRow());
+ put.setClusterId(entry.getKey().getClusterId());
+ preRow = put;
+ }
+ if (loc == null) {
+ throw new IOException("Can't locate location for row:" + Bytes.toString(kv.getRow())
+ + " of table:" + Bytes.toString(table));
+ }
+ preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
+ }
+ if (kv.isDelete()) {
+ del.addDeleteMarker(kv);
+ } else {
+ put.add(kv);
+ }
+ lastKV = kv;
+ }
+
+ // add the last row
+ if (preRow != null && lastAddedRow != preRow) {
+ synchronized (serverToBufferQueueMap) {
+ List queue = serverToBufferQueueMap.get(preKey);
+ if (queue == null) {
+ queue = Collections.synchronizedList(new ArrayList());
+ serverToBufferQueueMap.put(preKey, queue);
+ }
+ queue.add(preRow);
+ }
+ }
+ }
+
+ // process workitems
+ String maxLocKey = null;
+ int maxSize = 0;
+ List maxQueue = null;
+ synchronized (this.serverToBufferQueueMap) {
+ for (String key : this.serverToBufferQueueMap.keySet()) {
+ List curQueue = this.serverToBufferQueueMap.get(key);
+ if (curQueue.size() > maxSize) {
+ maxSize = curQueue.size();
+ maxQueue = curQueue;
+ maxLocKey = key;
+ }
+ }
+ if (maxSize < minBatchSize
+ && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
+ // buffer more to process
+ return;
+ } else if (maxSize > 0) {
+ this.serverToBufferQueueMap.remove(maxLocKey);
+ }
+ }
+
+ if (maxSize > 0) {
+ processWorkItems(maxLocKey, maxQueue);
+ }
+ }
+
+ private void processWorkItems(String key, List actions) throws IOException {
+ RegionServerWriter rsw = null;
+
+ long startTime = System.nanoTime();
+ try {
+ rsw = getRegionServerWriter(key);
+ rsw.sink.replayEntries(actions);
+
+ // Pass along summary statistics
+ rsw.incrementEdits(actions.size());
+ rsw.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(" Got while writing log entry to log", e);
+ throw e;
+ }
+ }
+
+ @Override
+ protected boolean flush() throws IOException {
+ String curLoc = null;
+ int curSize = 0;
+ List curQueue = null;
+ synchronized (this.serverToBufferQueueMap) {
+ for (String locationKey : this.serverToBufferQueueMap.keySet()) {
+ curQueue = this.serverToBufferQueueMap.get(locationKey);
+ if (!curQueue.isEmpty()) {
+ curSize = curQueue.size();
+ curLoc = locationKey;
+ break;
+ }
+ }
+ if (curSize > 0) {
+ this.serverToBufferQueueMap.remove(curLoc);
+ }
+ }
+
+ if (curSize > 0) {
+ this.processWorkItems(curLoc, curQueue);
+ dataAvailable.notifyAll();
+ return true;
+ }
+ return false;
+ }
+
+ public void addWriterError(Throwable t) {
+ thrown.add(t);
+ }
+
+ @Override
+ List finishWritingAndClose() throws IOException {
+ List result = new ArrayList();
+ try {
+ if (!finishWriting()) {
+ return null;
+ }
+ // returns an empty array in order to keep interface same as old way
+ return result;
+ } finally {
+ List thrown = closeRegionServerWriters();
+ if (thrown != null && !thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ }
+ }
+
+ private List closeRegionServerWriters() throws IOException {
+ List result = null;
+ if (!writersClosed) {
+ result = Lists.newArrayList();
+ try {
+ for (WriterThread t : writerThreads) {
+ while (t.isAlive()) {
+ t.shouldStop = true;
+ t.interrupt();
+ try {
+ t.join(10);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ }
+ }
+ }
+ } finally {
+ synchronized (writers) {
+ for (String locationKey : writers.keySet()) {
+ RegionServerWriter tmpW = writers.get(locationKey);
+ try {
+ tmpW.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
+ result.add(ioe);
+ }
+ }
+ }
+
+ // close tables
+ synchronized (this.tableNameToHTableMap) {
+ for (byte[] tableName : this.tableNameToHTableMap.keySet()) {
+ HTable htable = this.tableNameToHTableMap.get(tableName);
+ try {
+ htable.close();
+ } catch (IOException ioe) {
+ result.add(ioe);
+ }
+ }
+ }
+ this.sharedThreadPool.shutdown();
+ writersClosed = true;
+ }
+ }
+ return result;
+ }
+
+ Map getOutputCounts() {
+ TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR);
+ synchronized (writers) {
+ for (Map.Entry entry : writers.entrySet()) {
+ ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+ }
+ }
+ return ret;
+ }
+
+ Set getRecoveredRegions() {
+ return this.recoveredRegions;
+ }
+
+ private String getTableFromLocationStr(String loc) {
+ /**
+ * location key is in fomrat #
+ *
+ */
+ String[] splits = loc.split(KEY_DELIMITER);
+ if (splits.length != 2) {
+ return "";
+ }
+ return splits[1];
+ }
+
+ /**
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
+ * @return null if this region shouldn't output any logs
+ */
+ private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
+ RegionServerWriter ret = writers.get(loc);
+ if (ret != null) {
+ return ret;
+ }
+
+ String tableName = getTableFromLocationStr(loc);
+ if (tableName.isEmpty()) {
+ throw new IOException("Found invalid location string:" + loc);
+ }
+ HTable table = this.tableNameToHTableMap.get(Bytes.toBytes(tableName));
+
+ synchronized (writers) {
+ ret = writers.get(loc);
+ if (ret == null) {
+ ret = new RegionServerWriter(conf, table);
+ writers.put(loc, ret);
+ }
+ }
+ return ret;
+ }
+
+ private HTable getHTable(final byte[] table) throws IOException {
+ HTable htable = this.tableNameToHTableMap.get(table);
+ if (htable == null) {
+ synchronized (this.tableNameToHTableMap) {
+ htable = this.tableNameToHTableMap.get(table);
+ if (htable == null) {
+ htable = new HTable(conf, table, this.sharedThreadPool);
+ // htable.setRegionCachePrefetch(table, true);
+ htable.setAutoFlush(false, true);
+ this.tableNameToHTableMap.put(table, htable);
+ }
+ }
+ }
+ return htable;
+ }
+ }
+
+ /**
+ * Private data structure that wraps a receiving RS and collecting statistics about the data
+ * written to this newly assigned RS.
+ */
+ private final static class RegionServerWriter extends SinkWriter {
+ final WALEditsReplaySink sink;
+
+ RegionServerWriter(final Configuration conf, final HTable table) throws IOException {
+ this.sink = new WALEditsReplaySink(conf, table);
+ }
+
+ void close() throws IOException {
}
}
static class CorruptedLogFileException extends Exception {
private static final long serialVersionUID = 1L;
+
CorruptedLogFileException(String s) {
super(s);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index b2cd2f6..b262b08 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -28,15 +28,17 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
@@ -237,6 +239,37 @@ public class HLogUtil {
}
/**
+ * This function returns region server name from a log file name which is in either format:
+ * hdfs:///hbase/.logs/-splitting/... or hdfs:///hbase/.logs//...
+ * @param logFile
+ * @return null if the passed in logFile isn't a valid HLog file path
+ */
+ public static ServerName getServerNameFromHLogDirectoryName(Path logFile) {
+ Path logDir = logFile.getParent();
+ String logDirName = logDir.getName();
+ if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+ logDir = logFile;
+ logDirName = logDir.getName();
+ }
+ ServerName serverName = null;
+ if (logDirName.endsWith(HLog.SPLITTING_EXT)) {
+ logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length());
+ }
+ try {
+ serverName = ServerName.parseServerName(logDirName);
+ } catch (IllegalArgumentException ex) {
+ serverName = null;
+ LOG.warn("Invalid log file path=" + logFile, ex);
+ }
+ if (serverName != null && serverName.getStartcode() < 0) {
+ LOG.warn("Invalid log file path=" + logFile);
+ return null;
+ }
+ return serverName;
+ }
+
+ /**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java
new file mode 100644
index 0000000..4fdb4ae
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Class used to push numbers about the WAL into the metrics subsystem. This will take a single
+ * function call and turn it into multiple manipulations of the hadoop metrics system.
+ */
+@InterfaceAudience.Private
+public class MetricsWALEditsReplay {
+ static final Log LOG = LogFactory.getLog(MetricsWALEditsReplay.class);
+
+ private final MetricsWALSource source;
+
+ public MetricsWALEditsReplay() {
+ source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+ }
+
+ public void finishSync(long time) {
+ source.incrementSyncTime(time);
+ }
+
+ public void finishAppend(long time, long size) {
+
+ source.incrementAppendCount();
+ source.incrementAppendTime(time);
+ source.incrementAppendSize(size);
+
+ if (time > 1000) {
+ source.incrementSlowAppendCount();
+ LOG.warn(String.format("%s took %d ms appending an edit to hlog; len~=%s", Thread
+ .currentThread().getName(), time, StringUtils.humanReadableInt(size)));
+ }
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
new file mode 100644
index 0000000..c4ce6f5
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * This class is responsible for replaying the edits coming from a failed region server.
+ *
+ * This class uses the native HBase client in order to replay WAL entries.
+ *
+ */
+@InterfaceAudience.Private
+public class WALEditsReplaySink {
+
+ private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
+
+ private final Configuration conf;
+ private final HTable table;
+ private final MetricsWALEditsReplay metrics;
+ private final AtomicLong totalReplayedEdits = new AtomicLong();
+
+ /**
+ * Create a sink for WAL log entries replay
+ * @param conf conf object
+ * @param table a HTable instance managed by caller
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public WALEditsReplaySink(Configuration conf, HTable table)
+ throws IOException {
+ this.conf = HBaseConfiguration.create(conf);
+ this.metrics = new MetricsWALEditsReplay();
+ this.table = table;
+ }
+
+ /**
+ * Replay an array of actions of the same region directly into the newly assigned Region Server
+ * @param actions
+ * @throws IOException
+ */
+ public void replayEntries(List actions) throws IOException {
+ if (actions.size() == 0) {
+ return;
+ }
+
+ try {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ table.batch(actions);
+ long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime
+ + "(ms)!");
+ /**
+ * TODO: Add more metricis.
+ */
+ this.totalReplayedEdits.addAndGet(actions.size());
+ } catch (InterruptedException ix) {
+ throw new InterruptedIOException("Interrupted when replaying wal edits.");
+ }
+ }
+
+ /**
+ * Get a string representation of this sink's metrics
+ * @return string with the total replayed edits count
+ */
+ public String getStats() {
+ return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
+ + this.totalReplayedEdits;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
new file mode 100644
index 0000000..ea43338
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+
+/**
+ * Watcher used to be notified of the recovering region coming out of recovering state
+ */
+@InterfaceAudience.Private
+public class RecoveringRegionWatcher extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class);
+
+ private HRegionServer server;
+
+ /**
+ * Construct a ZooKeeper event listener.
+ */
+ public RecoveringRegionWatcher(ZooKeeperWatcher watcher, HRegionServer server) {
+ super(watcher);
+ watcher.registerListener(this);
+ this.server = server;
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ if (this.server.isStopped() || this.server.isStopping()) {
+ return;
+ }
+
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ if (!this.watcher.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
+ return;
+ }
+
+ String regionName = path.substring(parentPath.length() + 1);
+ if (!this.server.getRecoveringRegions().remove(regionName)) {
+ LOG.info("Region:" + regionName + " isn't marked as recovering inside current RS.");
+ } else {
+ LOG.info(path + " znode deleted. Region: " + regionName + " completes recovery.");
+ }
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index d1e8832..9c837d3 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -18,15 +18,24 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.apache.hadoop.hbase.SplitLogCounters.*;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
+import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -42,7 +51,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
@@ -56,9 +74,11 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -89,9 +109,14 @@ public class TestDistributedLogSplitting {
HBaseTestingUtility TEST_UTIL;
private void startCluster(int num_rs) throws Exception{
+ conf = HBaseConfiguration.create();
+ startCluster(num_rs, conf);
+ }
+
+ private void startCluster(int num_rs, Configuration inConf) throws Exception {
SplitLogCounters.resetCounters();
LOG.info("Starting cluster");
- conf = HBaseConfiguration.create();
+ this.conf = inConf;
conf.getLong("hbase.splitlog.max.resubmit", 0);
// Make the failure test faster
conf.setInt("zookeeper.recovery.retry", 0);
@@ -111,13 +136,20 @@ public class TestDistributedLogSplitting {
@After
public void after() throws Exception {
+ for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
+ mt.getMaster().abort("closing...", new Exception("Trace info"));
+ }
+
TEST_UTIL.shutdownMiniCluster();
}
@Test (timeout=300000)
public void testRecoveredEdits() throws Exception {
LOG.info("testRecoveredEdits");
- startCluster(NUM_RS);
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ startCluster(NUM_RS, curConf);
+
final int NUM_LOG_LINES = 1000;
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
// turn off load balancing to prevent regions from moving around otherwise
@@ -150,8 +182,7 @@ public class TestDistributedLogSplitting {
it.remove();
}
}
- makeHLog(hrs.getWAL(), regions, "table",
- NUM_LOG_LINES, 100);
+ makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
slm.splitLogDistributed(logDir);
@@ -172,6 +203,422 @@ public class TestDistributedLogSplitting {
assertEquals(NUM_LOG_LINES, count);
}
+ @Test(timeout = 300000)
+ public void testLogReplayWithNonMetaRSDown() throws Exception {
+ LOG.info("testLogReplayWithNonMetaRSDown");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ startCluster(NUM_RS, curConf);
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_LOG_LINES = 1000;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+ List regions = null;
+ HRegionServer hrs = null;
+ for (int i = 0; i < NUM_RS; i++) {
+ boolean isCarryingMeta = false;
+ hrs = rsts.get(i).getRegionServer();
+ regions = ProtobufUtil.getOnlineRegions(hrs);
+ for (HRegionInfo region : regions) {
+ if (region.isMetaRegion()) {
+ isCarryingMeta = true;
+ break;
+ }
+ }
+ if (isCarryingMeta) {
+ continue;
+ }
+ break;
+ }
+
+ LOG.info("#regions = " + regions.size());
+ Iterator it = regions.iterator();
+ while (it.hasNext()) {
+ HRegionInfo region = it.next();
+ if (region.isMetaTable()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+
+ // wait for abort completes
+ this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
+ ht.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testLogReplayWithMetaRSDown() throws Exception {
+ LOG.info("testRecoveredEditsReplayWithMetaRSDown");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ startCluster(NUM_RS, curConf);
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_LOG_LINES = 1000;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+ List regions = null;
+ HRegionServer hrs = null;
+ for (int i = 0; i < NUM_RS; i++) {
+ boolean isCarryingMeta = false;
+ hrs = rsts.get(i).getRegionServer();
+ regions = ProtobufUtil.getOnlineRegions(hrs);
+ for (HRegionInfo region : regions) {
+ if (region.isMetaRegion()) {
+ isCarryingMeta = true;
+ break;
+ }
+ }
+ if (!isCarryingMeta) {
+ continue;
+ }
+ break;
+ }
+
+ LOG.info("#regions = " + regions.size());
+ Iterator it = regions.iterator();
+ while (it.hasNext()) {
+ HRegionInfo region = it.next();
+ if (region.isMetaTable()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+
+ this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
+ ht.close();
+ }
+
+ private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw,
+ final int numRegions, final int numofLines) throws Exception {
+ final MiniHBaseCluster tmpCluster = this.cluster;
+
+ // abort RS
+ LOG.info("Aborting region server: " + hrs.getServerName());
+ hrs.abort("testing");
+
+ // wait for abort completes
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+ }
+ });
+
+ // wait for regions come online
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1));
+ }
+ });
+
+ // wait for all regions are fully recovered
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+ zkw.recoveringRegionsZNode, false);
+ return (recoveringRegions != null && recoveringRegions.size() == 0);
+ }
+ });
+
+ assertEquals(numofLines, TEST_UTIL.countRows(ht));
+ }
+
+ @Test(timeout = 300000)
+ public void testMasterStartsUpWithLogSplittingWork() throws Exception {
+ LOG.info("testMasterStartsUpWithLogSplittingWork");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ curConf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
+ startCluster(NUM_RS, curConf);
+
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_LOG_LINES = 1000;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+ List regions = null;
+ HRegionServer hrs = null;
+ for (int i = 0; i < NUM_RS; i++) {
+ boolean isCarryingMeta = false;
+ hrs = rsts.get(i).getRegionServer();
+ regions = ProtobufUtil.getOnlineRegions(hrs);
+ for (HRegionInfo region : regions) {
+ if (region.isMetaRegion()) {
+ isCarryingMeta = true;
+ break;
+ }
+ }
+ if (isCarryingMeta) {
+ continue;
+ }
+ break;
+ }
+
+ LOG.info("#regions = " + regions.size());
+ Iterator it = regions.iterator();
+ while (it.hasNext()) {
+ HRegionInfo region = it.next();
+ if (region.isMetaTable()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+
+ // abort master
+ abortMaster(cluster);
+
+ // abort RS
+ int numRS = cluster.getLiveRegionServerThreads().size();
+ LOG.info("Aborting region server: " + hrs.getServerName());
+ hrs.abort("testing");
+
+ // wait for abort completes
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+ }
+ });
+
+ Thread.sleep(2000);
+ LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
+
+ startMasterAndWaitUntilLogSplit(cluster);
+
+ // wait for abort completes
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
+ }
+ });
+
+ LOG.info("Current Open Regions After Master Node Starts Up:"
+ + getAllOnlineRegions(cluster).size());
+
+ assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+
+ ht.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testMasterStartsUpWithLogReplayWork() throws Exception {
+ LOG.info("testMasterStartsUpWithLogReplayWork");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ curConf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
+ startCluster(NUM_RS, curConf);
+
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_LOG_LINES = 1000;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+ List regions = null;
+ HRegionServer hrs = null;
+ for (int i = 0; i < NUM_RS; i++) {
+ boolean isCarryingMeta = false;
+ hrs = rsts.get(i).getRegionServer();
+ regions = ProtobufUtil.getOnlineRegions(hrs);
+ for (HRegionInfo region : regions) {
+ if (region.isMetaRegion()) {
+ isCarryingMeta = true;
+ break;
+ }
+ }
+ if (isCarryingMeta) {
+ continue;
+ }
+ break;
+ }
+
+ LOG.info("#regions = " + regions.size());
+ Iterator it = regions.iterator();
+ while (it.hasNext()) {
+ HRegionInfo region = it.next();
+ if (region.isMetaTable()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+
+ // abort master
+ abortMaster(cluster);
+
+ // abort RS
+ int numRS = cluster.getLiveRegionServerThreads().size();
+ LOG.info("Aborting region server: " + hrs.getServerName());
+ hrs.abort("testing");
+
+ // wait for the RS dies
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+ }
+ });
+
+ Thread.sleep(2000);
+ LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
+
+ startMasterAndWaitUntilLogSplit(cluster);
+
+ // wait for all regions are fully recovered
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+ zkw.recoveringRegionsZNode, false);
+ return (recoveringRegions != null && recoveringRegions.size() == 0);
+ }
+ });
+
+ LOG.info("Current Open Regions After Master Node Starts Up:"
+ + getAllOnlineRegions(cluster).size());
+
+ assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+
+ ht.close();
+ }
+
+
+ @Test(timeout = 300000)
+ public void testLogReplayTwoSequentialRSDown() throws Exception {
+ LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ startCluster(NUM_RS, curConf);
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_LOG_LINES = 1000;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+
+ List regions = null;
+ HRegionServer hrs1 = rsts.get(0).getRegionServer();
+ regions = ProtobufUtil.getOnlineRegions(hrs1);
+
+ makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+
+ // abort RS1
+ LOG.info("Aborting region server: " + hrs1.getServerName());
+ hrs1.abort("testing");
+
+ // wait for abort completes
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
+ }
+ });
+
+ // wait for regions come online
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
+ }
+ });
+
+ // abort second region server
+ rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs2 = rsts.get(0).getRegionServer();
+ LOG.info("Aborting one more region server: " + hrs2.getServerName());
+ hrs2.abort("testing");
+
+ // wait for abort completes
+ TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
+ }
+ });
+
+ // wait for regions come online
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
+ }
+ });
+
+ // wait for all regions are fully recovered
+ TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
+ zkw.recoveringRegionsZNode, false);
+ return (recoveringRegions != null && recoveringRegions.size() == 0);
+ }
+ });
+
+ assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
+ ht.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testMarkRegionsRecoveringInZK() throws Exception {
+ LOG.info("testMarkRegionsRecoveringInZK");
+ Configuration curConf = HBaseConfiguration.create();
+ curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ startCluster(NUM_RS, curConf);
+ master.balanceSwitch(false);
+ List rsts = cluster.getLiveRegionServerThreads();
+ final ZooKeeperWatcher zkw = master.getZooKeeperWatcher();
+ HTable ht = installTable(zkw, "table", "family", 40);
+ final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+
+ final HRegionServer hrs = rsts.get(0).getRegionServer();
+ List regions = ProtobufUtil.getOnlineRegions(hrs);
+ HRegionInfo region = regions.get(0);
+ Set regionSet = new HashSet();
+ regionSet.add(region);
+ slm.markRegionsRecoveringInZK(rsts.get(0).getRegionServer().getServerName(), regionSet);
+ slm.markRegionsRecoveringInZK(rsts.get(1).getRegionServer().getServerName(), regionSet);
+
+ List recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
+ ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
+
+ assertEquals(recoveringRegions.size(), 2);
+
+ // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
+ TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (hrs.getRecoveringRegions().size() == 0);
+ }
+ });
+ ht.close();
+ }
+
/**
* The original intention of this test was to force an abort of a region
* server and to make sure that the failure path in the region servers is
@@ -197,8 +644,9 @@ public class TestDistributedLogSplitting {
installTable(new ZooKeeperWatcher(conf, "table-creation", null),
"table", "family", 40);
- makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table",
- NUM_LOG_LINES, 100);
+
+ makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES,
+ 100);
new Thread() {
public void run() {
@@ -400,9 +848,8 @@ public class TestDistributedLogSplitting {
}
}
- public void makeHLog(HLog log,
- List hris, String tname,
- int num_edits, int edit_size) throws IOException {
+ public void makeHLog(HLog log, List hris, String tname, String fname, int num_edits,
+ int edit_size) throws IOException {
// remove root and meta region
hris.remove(HRegionInfo.ROOT_REGIONINFO);
@@ -411,29 +858,33 @@ public class TestDistributedLogSplitting {
HTableDescriptor htd = new HTableDescriptor(tname);
byte[] value = new byte[edit_size];
for (int i = 0; i < edit_size; i++) {
- value[i] = (byte)('a' + (i % 26));
+ value[i] = (byte) ('a' + (i % 26));
}
int n = hris.size();
int[] counts = new int[n];
- int j = 0;
if (n > 0) {
for (int i = 0; i < num_edits; i += 1) {
WALEdit e = new WALEdit();
- byte [] row = Bytes.toBytes("r" + Integer.toString(i));
- byte [] family = Bytes.toBytes("f");
- byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
- e.add(new KeyValue(row, family, qualifier,
- System.currentTimeMillis(), value));
- j++;
- log.append(hris.get(j % n), table, e, System.currentTimeMillis(), htd);
- counts[j % n] += 1;
+ HRegionInfo curRegionInfo = hris.get(i % n);
+ byte[] startRow = curRegionInfo.getStartKey();
+ if (startRow == null || startRow.length == 0) {
+ startRow = new byte[] { 0, 0, 0, 0, 1 };
+ }
+ byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
+ row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
+ // HBaseTestingUtility.createMultiRegions use 5 bytes
+ // key
+ byte[] family = Bytes.toBytes(fname);
+ byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+ e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
+ log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd);
+ counts[i % n] += 1;
}
}
log.sync();
log.close();
for (int i = 0; i < n; i++) {
- LOG.info("region " + hris.get(i).getRegionNameAsString() +
- " has " + counts[i] + " edits");
+ LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
}
return;
}
@@ -493,4 +944,27 @@ public class TestDistributedLogSplitting {
assertTrue(false);
}
+ private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
+ for (MasterThread mt : cluster.getLiveMasterThreads()) {
+ if (mt.getMaster().isActiveMaster()) {
+ mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
+ mt.join();
+ break;
+ }
+ }
+ LOG.debug("Master is aborted");
+ }
+
+ private void startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
+ throws IOException, InterruptedException {
+ cluster.startMaster();
+ HMaster master = cluster.getMaster();
+ while (!master.isInitialized()) {
+ Thread.sleep(100);
+ }
+ ServerManager serverManager = master.getServerManager();
+ while (serverManager.areDeadServersInProgress()) {
+ Thread.sleep(100);
+ }
+ }
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index e0bb724..37cac60 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -783,6 +783,7 @@ public class TestMasterFailover {
while (master.getServerManager().areDeadServersInProgress()) {
Thread.sleep(10);
}
+
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java
index 52091bd..9449f53 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java
@@ -101,23 +101,6 @@ public class TestRSKilledWhenMasterInitializing {
KeeperException, InterruptedException {
super(conf);
}
-
- @Override
- protected void splitLogAfterStartup(MasterFileSystem mfs) {
- super.splitLogAfterStartup(mfs);
- logSplit = true;
- // If "TestingMaster.sleep" is set, sleep after log split.
- if (getConfiguration().getBoolean("TestingMaster.sleep", false)) {
- int duration = getConfiguration().getInt(
- "TestingMaster.sleep.duration", 0);
- Threads.sleep(duration);
- }
- }
-
-
- public boolean isLogSplitAfterStartup() {
- return logSplit;
- }
}
@Test(timeout = 120000)
@@ -163,7 +146,7 @@ public class TestRSKilledWhenMasterInitializing {
/* NO.1 .META. region correctness */
// First abort master
abortMaster(cluster);
- TestingMaster master = startMasterAndWaitUntilLogSplit(cluster);
+ TestingMaster master = startMasterAndWaitTillMetaRegionAssignment(cluster);
// Second kill meta server
int metaServerNum = cluster.getServerWithMeta();
@@ -216,14 +199,12 @@ public class TestRSKilledWhenMasterInitializing {
LOG.debug("Master is aborted");
}
- private TestingMaster startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
+ private TestingMaster startMasterAndWaitTillMetaRegionAssignment(MiniHBaseCluster cluster)
throws IOException, InterruptedException {
TestingMaster master = (TestingMaster) cluster.startMaster().getMaster();
- while (!master.isLogSplitAfterStartup()) {
+ while (!master.isInitializationStartsMetaRegoinAssignment()) {
Thread.sleep(100);
}
- LOG.debug("splitted:" + master.isLogSplitAfterStartup() + ",initialized:"
- + master.isInitialized());
return master;
}
@@ -232,7 +213,9 @@ public class TestRSKilledWhenMasterInitializing {
while (!master.isInitialized()) {
Thread.sleep(100);
}
+ while (master.getServerManager().areDeadServersInProgress()) {
+ Thread.sleep(100);
+ }
LOG.debug("master isInitialized");
}
-
}