diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 99fa7ae..4beca71 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -694,6 +694,9 @@ public final class HConstants { public static final String DISTRIBUTED_LOG_SPLITTING_KEY = "hbase.master.distributed.log.splitting"; + /** Conf key that enables unflushed WAL edits directly being replayed to region servers */ + public static final String REPLAY_WAL_EDITS_KEY = "hbase.master.replay.wal.edits"; + /** * The name of the configuration parameter that specifies * the number of bytes in a newly created checksum chunk. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java index 1c226f7..43921fa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java @@ -18,11 +18,6 @@ */ package org.apache.hadoop.hbase.master; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; - import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -33,6 +28,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + /** * Class to hold dead servers list and utility querying dead server list. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9ccff6f..4d7d23d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,7 +98,6 @@ import org.apache.hadoop.hbase.master.handler.ModifyTableHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; -import org.apache.hadoop.hbase.master.handler.TableEventHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -698,14 +698,58 @@ Server { this.assignmentManager.startTimeOutMonitor(); } - // TODO: Should do this in background rather than block master startup - status.setStatus("Splitting logs after master startup"); - splitLogAfterStartup(this.fileSystemManager); + // get a list for previously failed RS which need recovery work + List failedServers = getFailedServersFromLogFolders(this.fileSystemManager); + ServerName preRootServer = this.catalogTracker.getRootLocation(); + + if (!this.isInReplayWALEditsMode() && preRootServer != null + && failedServers.contains(preRootServer)) { + // create recovered edits file for root server before assignment + // Note: we can't remove preRootServer from failedServers list because it may also host + // some user regions + this.fileSystemManager.splitMetaLog(preRootServer); + } + // Make sure root assigned before proceeding. + if (!assignRoot(status)) return; + if (this.isInReplayWALEditsMode() && preRootServer != null + && failedServers.contains(preRootServer)) { + // replay WAL edits mode need new _ROOT_ RS is assigned firstly + this.fileSystemManager.splitMetaLog(preRootServer); + } + + // log splitting for .META. server + ServerName preMetaServer = this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); + if (preMetaServer != null && failedServers.contains(preMetaServer)) { + if (!this.isInReplayWALEditsMode()) { + // create recovered edits file for .META. server + // Note: we can't remove preMetaServer from failedServers list because it may also host + // some user regions + this.fileSystemManager.splitMetaLog(preMetaServer); + } else { + Set regions = new HashSet(); + regions.add(HRegionInfo.FIRST_META_REGIONINFO); + this.fileSystemManager.startSubmitMetaLogSplitJob(preMetaServer, regions); + } + } + // Make sure meta assigned before proceeding. + if (!assignMeta(status, preRootServer)) return; + if (this.isInReplayWALEditsMode() && preMetaServer != null + && failedServers.contains(preMetaServer)) { + // replay WAL edits mode need new .META. RS is assigned firstly + this.fileSystemManager.splitMetaLog(preMetaServer); + } - // Make sure root and meta assigned before proceeding. - if (!assignRootAndMeta(status)) return; enableServerShutdownHandler(); + status.setStatus("Submit log splitting work"); + // Master has recovered ROOT and META region servers and we put + // other failed region servers in a queue to be handled later + if(failedServers!=null) { + for(ServerName curServer : failedServers) { + this.serverManager.processDeadServer(curServer, true); + } + } + // Update meta with new PB serialization if required. i.e migrate all HRI // to PB serialization in meta and update the status in ROOT. This must happen // before we assign all user regions or else the assignment will fail. @@ -763,11 +807,11 @@ Server { } /** - * Override to change master's splitLogAfterStartup. Used testing + * Get a list of servers which need recovery work * @param mfs */ - protected void splitLogAfterStartup(final MasterFileSystem mfs) { - mfs.splitLogAfterStartup(); + protected List getFailedServersFromLogFolders(final MasterFileSystem mfs) { + return mfs.getFailedServersFromLogFolders(); } /** @@ -798,14 +842,13 @@ Server { } /** - * Check -ROOT- and .META. are assigned. If not, - * assign them. + * Check -ROOT- is assigned. If not, assign them. * @throws InterruptedException * @throws IOException * @throws KeeperException * @return True if root and meta are healthy, assigned */ - boolean assignRootAndMeta(MonitoredTask status) + boolean assignRoot(MonitoredTask status) throws InterruptedException, IOException, KeeperException { int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); @@ -820,13 +863,22 @@ Server { boolean rootRegionLocation = catalogTracker.verifyRootRegionLocation(timeout); if (!rit && !rootRegionLocation) { currentRootServer = this.catalogTracker.getRootLocation(); - splitLogAndExpireIfOnline(currentRootServer); + boolean beingExpired = this.expireIfOnline(currentRootServer); + if (beingExpired) { + if (!this.isInReplayWALEditsMode()) { + this.fileSystemManager.splitMetaLog(currentRootServer); + } + } this.assignmentManager.assignRoot(); // Make sure a -ROOT- location is set. if (!isRootLocation()) return false; // This guarantees that the transition assigning -ROOT- has completed this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO); assigned++; + if (beingExpired && this.fileSystemManager.isInReplayWALEditsMode()) { + // In Replay WAL Mode, we need the new root server online + this.fileSystemManager.splitMetaLog(currentRootServer); + } } else if (rit && !rootRegionLocation) { // Make sure a -ROOT- location is set. if (!isRootLocation()) return false; @@ -846,36 +898,63 @@ Server { LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getRootLocation()); + status.setStatus("ROOT assigned."); + return true; + } + + /** + * Check .META. are assigned. If not, assign it. + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + * @return True if root and meta are healthy, assigned + */ + boolean assignMeta(MonitoredTask status, ServerName previousRootServer) + throws InterruptedException, IOException, KeeperException { // Work on meta region + int assigned = 0; + long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); + boolean beingExpired = false; + status.setStatus("Assigning META region"); - assignmentManager.getRegionStates().createRegionState( - HRegionInfo.FIRST_META_REGIONINFO); - rit = this.assignmentManager. - processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); + assignmentManager.getRegionStates().createRegionState(HRegionInfo.FIRST_META_REGIONINFO); + boolean rit = this.assignmentManager + .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout); if (!rit && !metaRegionLocation) { - ServerName currentMetaServer = - this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); - if (currentMetaServer != null - && !currentMetaServer.equals(currentRootServer)) { - splitLogAndExpireIfOnline(currentMetaServer); + ServerName currentMetaServer = this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); + if (currentMetaServer != null && !currentMetaServer.equals(previousRootServer)) { + beingExpired = expireIfOnline(currentMetaServer); + } + if (beingExpired) { + if (this.isInReplayWALEditsMode()) { + Set regions = new HashSet(); + regions.add(HRegionInfo.FIRST_META_REGIONINFO); + this.fileSystemManager.startSubmitMetaLogSplitJob(currentMetaServer, regions); + } else { + this.fileSystemManager.splitMetaLog(currentMetaServer); + } } assignmentManager.assignMeta(); enableSSHandWaitForMeta(); assigned++; + if (beingExpired && this.fileSystemManager.isInReplayWALEditsMode()) { + // In Replay WAL Mode, we need the new .META. server online + this.fileSystemManager.splitMetaLog(currentMetaServer); + } } else if (rit && !metaRegionLocation) { // Wait until META region added to region server onlineRegions. See HBASE-5875. enableSSHandWaitForMeta(); assigned++; } else { - // Region already assigned. We didn't assign it. Add to in-memory state. + // Region already assigned. We didn't assign it. Add to in-memory state. this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO, this.catalogTracker.getMetaLocation()); } enableCatalogTables(Bytes.toString(HConstants.META_TABLE_NAME)); - LOG.info(".META. assigned=" + assigned + ", rit=" + rit + - ", location=" + catalogTracker.getMetaLocation()); - status.setStatus("META and ROOT assigned."); + LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location=" + + catalogTracker.getMetaLocation()); + status.setStatus("META assigned."); return true; } @@ -947,20 +1026,18 @@ Server { } /** - * Split a server's log and expire it if we find it is one of the online - * servers. + * Expire a server if we find it is one of the online servers. * @param sn ServerName to check. * @throws IOException */ - private void splitLogAndExpireIfOnline(final ServerName sn) + private boolean expireIfOnline(final ServerName sn) throws IOException { if (sn == null || !serverManager.isServerOnline(sn)) { - return; + return false; } - LOG.info("Forcing splitLog and expire of " + sn); - fileSystemManager.splitMetaLog(sn); - fileSystemManager.splitLog(sn); + LOG.info("Forcing expire of " + sn); serverManager.expireServer(sn); + return true; } @Override @@ -2423,4 +2500,8 @@ Server { String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation); } + + private boolean isInReplayWALEditsMode() { + return conf.getBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index c9e21ec..295a7d5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,6 +45,8 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -54,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.zookeeper.KeeperException; /** * This class abstracts a bunch of operations the HMaster needs to interact with @@ -81,6 +85,7 @@ public class MasterFileSystem { private final Path rootdir; // create the split log lock final Lock splitLogLock = new ReentrantLock(); + final boolean replayWALEdits; final boolean distributedLogSplitting; final SplitLogManager splitLogManager; private final MasterServices services; @@ -117,15 +122,13 @@ public class MasterFileSystem { conf.set("fs.defaultFS", fsUri); // make sure the fs has the same conf fs.setConf(conf); - this.distributedLogSplitting = - conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); + this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(), + master, services, master.getServerName()); + this.distributedLogSplitting = conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); if (this.distributedLogSplitting) { - this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, services, master.getServerName()); this.splitLogManager.finishInitialization(masterRecovery); - } else { - this.splitLogManager = null; } + this.replayWALEdits = conf.getBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); @@ -201,69 +204,53 @@ public class MasterFileSystem { } /** - * Inspect the log directory to recover any log file without - * an active region server. + * Inspect the log directory to find dead servers which need recovery work */ - void splitLogAfterStartup() { - boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", - HLog.SPLIT_SKIP_ERRORS_DEFAULT); + List getFailedServersFromLogFolders() { + List serverNames = new ArrayList(); Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); - do { - if (master.isStopped()) { - LOG.warn("Master stopped while splitting logs"); - break; + + if (master.isStopped()) { + LOG.warn("Master stopped while splitting logs"); + return serverNames; + } + + try { + if (!this.fs.exists(logsDirPath)) return serverNames; + FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); + // Get online servers after getting log folders to avoid log folder deletion of newly + // checked in region servers . see HBASE-5916 + Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers() + .keySet(); + + if (logFolders == null || logFolders.length == 0) { + LOG.debug("No log files to split, proceeding..."); + return serverNames; } - List serverNames = new ArrayList(); - try { - if (!this.fs.exists(logsDirPath)) return; - FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); - // Get online servers after getting log folders to avoid log folder deletion of newly - // checked in region servers . see HBASE-5916 - Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers() - .keySet(); - - if (logFolders == null || logFolders.length == 0) { - LOG.debug("No log files to split, proceeding..."); - return; + for (FileStatus status : logFolders) { + String sn = status.getPath().getName(); + // truncate splitting suffix if present (for ServerName parsing) + if (sn.endsWith(HLog.SPLITTING_EXT)) { + sn = sn.substring(0, sn.length() - HLog.SPLITTING_EXT.length()); } - for (FileStatus status : logFolders) { - String sn = status.getPath().getName(); - // truncate splitting suffix if present (for ServerName parsing) - if (sn.endsWith(HLog.SPLITTING_EXT)) { - sn = sn.substring(0, sn.length() - HLog.SPLITTING_EXT.length()); - } - ServerName serverName = ServerName.parseServerName(sn); - if (!onlineServers.contains(serverName)) { - LOG.info("Log folder " + status.getPath() + " doesn't belong " - + "to a known region server, splitting"); - serverNames.add(serverName); - } else { - LOG.info("Log folder " + status.getPath() - + " belongs to an existing region server"); - } - } - splitLog(serverNames, META_FILTER); - splitLog(serverNames, NON_META_FILTER); - retrySplitting = false; - } catch (IOException ioe) { - LOG.warn("Failed splitting of " + serverNames, ioe); - if (!checkFileSystem()) { - LOG.warn("Bad Filesystem, exiting"); - Runtime.getRuntime().halt(1); - } - try { - if (retrySplitting) { - Thread.sleep(conf.getInt( - "hbase.hlog.split.failure.retry.interval", 30 * 1000)); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted, aborting since cannot return w/o splitting"); - Thread.currentThread().interrupt(); - retrySplitting = false; - Runtime.getRuntime().halt(1); + ServerName serverName = ServerName.parseServerName(sn); + if (!onlineServers.contains(serverName)) { + LOG.info("Log folder " + status.getPath() + " doesn't belong " + + "to a known region server, splitting"); + serverNames.add(serverName); + } else { + LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } - } while (retrySplitting); + } catch (IOException ioe) { + LOG.warn("Failed splitting of " + serverNames, ioe); + if (!checkFileSystem()) { + LOG.warn("Bad Filesystem, exiting"); + Runtime.getRuntime().halt(1); + } + } + + return serverNames; } public void splitLog(final ServerName serverName) throws IOException { @@ -288,7 +275,7 @@ public class MasterFileSystem { } splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); - splitLogSize = splitLogManager.splitLogDistributed(logDirs, META_FILTER); + splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, META_FILTER); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; if (this.metricsMaster != null) { this.metricsMaster.addSplit(splitTime, splitLogSize); @@ -316,6 +303,56 @@ public class MasterFileSystem { return logDirs; } + public boolean isInReplayWALEditsMode() { + return this.replayWALEdits; + } + + /** + * Mark regions in recovering state when replayWALEdits are set true + * @param serverNames + * @throws IOException + */ + public void startSubmitLogSplitJob(List serverNames) throws IOException { + if (!this.replayWALEdits) { + return; + } + this.splitLogManager.startSubmitLogSplitJob(serverNames); + // mark regions in recovering state + for (ServerName serverName : serverNames) { + NavigableMap regions = this.getServerUserRegions(serverName); + if (regions == null) { + continue; + } + try { + this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet()); + } catch (KeeperException e) { + throw new IOException(e); + } + } + } + + /** + * Mark meta regions in recovering state when replayWALEdits are set true. The function is used + * when {@link #getServerUserRegions(ServerName)} can't be used in case meta RS is down. + * @param serverNames + * @throws IOException + */ + public void startSubmitMetaLogSplitJob(ServerName serverName, Set regions) + throws IOException { + if (!this.replayWALEdits || (regions == null)) { + return; + } + List tmpServerNames = new ArrayList(); + tmpServerNames.add(serverName); + this.splitLogManager.startSubmitLogSplitJob(tmpServerNames); + // mark regions in recovering state + try { + this.splitLogManager.markRegionsRecoveringInZK(serverName, regions); + } catch (KeeperException e) { + throw new IOException(e); + } + } + public void splitLog(final List serverNames) throws IOException { splitLog(serverNames, NON_META_FILTER); } @@ -339,7 +376,7 @@ public class MasterFileSystem { if (distributedLogSplitting) { splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); - splitLogSize = splitLogManager.splitLogDistributed(logDirs,filter); + splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { for(Path logDir: logDirs){ @@ -602,4 +639,21 @@ public class MasterFileSystem { this.services.getTableDescriptors().add(htd); return htd; } + + private NavigableMap getServerUserRegions(ServerName serverName) + throws IOException { + while (!this.master.isStopped()) { + try { + this.master.getCatalogTracker().waitForMeta(); + return MetaReader.getServerUserRegions(this.master.getCatalogTracker(), serverName); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted", e); + } catch (IOException ioe) { + LOG.info("Received exception accessing META during server shutdown of " + serverName + + ", retrying META read", ioe); + } + } + return null; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 50ee2a4..d0a66d3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -27,9 +27,9 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -499,6 +499,10 @@ public class ServerManager { } public synchronized void processDeadServer(final ServerName serverName) { + this.processDeadServer(serverName, false); + } + + public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) { // When assignment manager is cleaning up the zookeeper nodes and rebuilding the // in-memory region states, region servers could be down. Root/meta table can and // should be re-assigned, log splitting can be done too. However, it is better to @@ -513,8 +517,9 @@ public class ServerManager { } this.deadservers.add(serverName); - this.services.getExecutorService().submit(new ServerShutdownHandler( - this.master, this.services, this.deadservers, serverName, false)); + this.services.getExecutorService().submit( + new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName, + shouldSplitHlog)); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 56a30f8..fa07b75 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.DeserializationException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; @@ -51,8 +54,12 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -68,6 +75,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Distributes the task of log splitting to the available region servers. * Coordination happens via zookeeper. For every log file that has to be split a @@ -119,6 +128,17 @@ public class SplitLogManager extends ZooKeeperListener { private long lastNodeCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; + /** + * In replayWALEdits mode, we need touch both splitlog and recovering-regions znodes one time. So + * the lock is used to guard such cases. + */ + protected final ReentrantReadWriteLock recoveringRegionLock = new ReentrantReadWriteLock(); + + private final Set inflightWorkItems = Collections + .synchronizedSet(new HashSet()); + + final boolean replayWALEdits; + private final ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; @@ -186,6 +206,7 @@ public class SplitLogManager extends ZooKeeperListener { new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); this.failedDeletions = Collections.synchronizedSet(new HashSet()); + this.replayWALEdits = conf.getBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); } public void finishInitialization(boolean masterRecovery) { @@ -245,7 +266,22 @@ public class SplitLogManager extends ZooKeeperListener { * @return cumulative size of the logfiles split */ public long splitLogDistributed(final List logDirs) throws IOException { - return splitLogDistributed(logDirs, null); + if (logDirs.size() == 0) { + return 0; + } + List serverNames = new ArrayList(); + for (Path logDir : logDirs) { + try { + ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir); + if (serverName != null) { + serverNames.add(serverName); + } + } catch (IllegalArgumentException e) { + // ignore invalid format error. + LOG.debug("Cannot parse server name from " + logDir); + } + } + return splitLogDistributed(serverNames, logDirs, null); } /** @@ -259,8 +295,8 @@ public class SplitLogManager extends ZooKeeperListener { * @throws IOException If there was an error while splitting any log file * @return cumulative size of the logfiles split */ - public long splitLogDistributed(final List logDirs, PathFilter filter) - throws IOException { + public long splitLogDistributed(final List serverNames, final List logDirs, + PathFilter filter) throws IOException { MonitoredTask status = TaskMonitor.get().createStatus( "Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs, filter); @@ -282,7 +318,11 @@ public class SplitLogManager extends ZooKeeperListener { throw new IOException("duplicate log split scheduled for " + lf.getPath()); } } + this.endSubmitLogSplitJob(serverNames); waitForSplittingCompletion(batch, status); + // mark recovering regions up + this.removeRecoveringRegionsFromZK(serverNames); + if (batch.done != batch.installed) { batch.isDead = true; SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); @@ -409,6 +449,90 @@ public class SplitLogManager extends ZooKeeperListener { return count; } + /** + * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the + * region server hosting the region can allow reads to the recovered region + * @param serverNames servers which are just recovered + */ + private void removeRecoveringRegionsFromZK(final List serverNames) { + int count = 0; + Set recoveredServerNameSet = new HashSet(); + if (serverNames != null) { + for (ServerName tmpServerName : serverNames) { + recoveredServerNameSet.add(tmpServerName.getServerName()); + } + } + + try { + this.recoveringRegionLock.writeLock().lock(); + + if (!this.replayWALEdits + || (this.inflightWorkItems.size() != 0 && recoveredServerNameSet.size() == 0)) { + // the function is used in WALEdit replay mode and when there is either no inflight + // workitems or servers just recovered + return; + } + + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null) { + for (String t : tasks) { + if (!ZKSplitLog.isRescanNode(watcher, t)) { + count++; + } + } + } + if (count == 0 && this.inflightWorkItems.size() == 0) { + // no splitting work items left + deleteRecoveringRegionZNodes(null); + } else if (recoveredServerNameSet.size() > 0) { + // remove recovering regions which doesn't have any RS associated with it + List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); + if (regions != null) { + for (String region : regions) { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); + List failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); + if (failedServers == null || failedServers.isEmpty()) { + ZKUtil.deleteNode(watcher, nodePath); + LOG.debug("znode " + nodePath + " is deleted."); + return; + } + if (recoveredServerNameSet.containsAll(failedServers)) { + ZKUtil.deleteNodeRecursively(watcher, nodePath); + LOG.debug("znode " + nodePath + " is deleted."); + } else { + for (String failedServer : failedServers) { + if (recoveredServerNameSet.contains(failedServer)) { + String tmpPath = ZKUtil.joinZNode(nodePath, failedServer); + ZKUtil.deleteNode(watcher, tmpPath); + } + } + } + } + } + } + } catch (KeeperException ke) { + // ignore zookeeper errors + } finally { + this.recoveringRegionLock.writeLock().unlock(); + } + } + + private void deleteRecoveringRegionZNodes(List regions) { + try { + if (regions == null) { + // remove all children under /home/recovering-regions + ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); + } else { + for (String curRegion : regions) { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion); + ZKUtil.deleteNodeRecursively(watcher, nodePath); + } + } + } catch (KeeperException e) { + LOG.warn("Cannot remove reovering regions from ZooKeeper", e); + } + } + private void setDone(String path, TerminationStatus status) { Task task = tasks.get(path); if (task == null) { @@ -859,6 +983,131 @@ public class SplitLogManager extends ZooKeeperListener { } /** + * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for + * all regions of the passed in region servers + * @param serverName the name of a region server + * @param userRegions user regiones assigned on the region server + */ + public void markRegionsRecoveringInZK(final ServerName serverName, Set userRegions) + throws KeeperException { + if (userRegions == null || !this.replayWALEdits) { + return; + } + + for (HRegionInfo region : userRegions) { + String regionEncodeName = region.getEncodedName(); + long retries = this.zkretries; + + do { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName); + long lastRecordedFlushedSequenceId = -1; + try { + long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId( + regionEncodeName.getBytes()); + + /* + * znode layout: + * .../region_id[last known flushed sequence id]/failed server[last known + * flushed sequence id for the server] + */ + byte[] data = ZKUtil.getData(this.watcher, nodePath); + if(data == null) { + ZKUtil.createSetData(this.watcher, nodePath, + SplitLogManager.toByteArray(lastSequenceId)); + } else { + lastRecordedFlushedSequenceId = SplitLogManager.parseHLogPositionFrom(data); + if (lastSequenceId > 0 && lastRecordedFlushedSequenceId != lastSequenceId) { + // update last flushed sequence id in the region level + ZKUtil.setData(this.watcher, nodePath, SplitLogManager.toByteArray(lastSequenceId)); + } + } + // go one level deeper with server name + nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName()); + if (lastSequenceId == lastRecordedFlushedSequenceId) { + // the newly assigned RS failed even before any flush to the region + lastSequenceId = -1; + } + ZKUtil.createSetData(this.watcher, nodePath, SplitLogManager.toByteArray(lastSequenceId)); + + // break retry loop + break; + } catch (KeeperException e) { + // ignore ZooKeeper exceptions inside retry loop + if (retries <= 1) { + throw e; + } else { + // wait a little bit for retry + try { + Thread.sleep(20); + } catch (Exception ignoreE) { + // ignore + } + } + } + } while ((--retries) > 0); + } + } + + /** + * This function uses a reference count to guard that there is no job is being submitted when we + * starts to open recovering regions. Otherwise, we could prematurely open regions are just marked + * as recovering. + */ + public void startSubmitLogSplitJob(List serverNames) { + if (this.replayWALEdits && serverNames != null) { + // only use the reference count in WALEdits replay mode. + try { + this.recoveringRegionLock.writeLock().lock(); + this.inflightWorkItems.addAll(serverNames); + } finally { + this.recoveringRegionLock.writeLock().unlock(); + } + } + } + + public void endSubmitLogSplitJob(List serverNames) { + if (this.replayWALEdits && serverNames != null) { + this.inflightWorkItems.removeAll(serverNames); + } + } + + /** + * @param bytes - Content of a HLog position znode. + * @return long - The current HLog position. + * @throws DeserializationException + */ + public static long parseHLogPositionFrom(final byte[] bytes) { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition + .newBuilder(); + ZooKeeperProtos.ReplicationHLogPosition position; + try { + position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); + } catch (InvalidProtocolBufferException e) { + return -1; + } + return position.getPosition(); + } else { + if (bytes.length > 0) { + return Bytes.toLong(bytes); + } + return -1; + } + } + + /** + * @param position + * @return Serialized protobuf of position with pb magic prefix prepended suitable + * for use as content of an hlog position in a replication queue. + */ + static byte[] toByteArray(final long position) { + byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position) + .build().toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + /** * Keeps track of the batch of tasks submitted together by a caller in * splitLogDistributed(). Clients threads use this object to wait for all * their tasks to be done. @@ -1052,6 +1301,11 @@ public class SplitLogManager extends ZooKeeperListener { } failedDeletions.removeAll(tmpPaths); } + + // Garbage collect left-over /hbase/recovering-regions/... znode + if (tot == 0 && inflightWorkItems.size() == 0 && tasks.size() == 0) { + removeRecoveringRegionsFromZK(null); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java index 0f62005..bcf6420 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +56,15 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { try { LOG.info("Splitting META logs for " + serverName); if (this.shouldSplitHlog) { - this.services.getMasterFileSystem().splitMetaLog(serverName); + if(this.services.getMasterFileSystem().isInReplayWALEditsMode()) { + Set regions = new HashSet(); + if (isCarryingMeta()) { + regions.add(HRegionInfo.FIRST_META_REGIONINFO); + } + this.services.getMasterFileSystem().startSubmitMetaLogSplitJob(serverName, regions); + } else { + this.services.getMasterFileSystem().splitMetaLog(serverName); + } } } catch (IOException ioe) { this.services.getExecutorService().submit(this); @@ -91,8 +101,19 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { } else { LOG.info("META has been assigned to otherwhere, skip assigning."); } - } + + try { + if (this.shouldSplitHlog && this.services.getMasterFileSystem().isInReplayWALEditsMode()) { + this.services.getMasterFileSystem().splitMetaLog(serverName); + } + } catch (IOException ioe) { + this.services.getExecutorService().submit(this); + this.deadServers.add(serverName); + throw new IOException("failed log splitting for " + + serverName + ", will retry", ioe); + } + super.process(); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 1d7a66d..386b31d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -117,21 +117,7 @@ public class ServerShutdownHandler extends EventHandler { public void process() throws IOException { final ServerName serverName = this.serverName; try { - try { - if (this.shouldSplitHlog) { - LOG.info("Splitting logs for " + serverName); - this.services.getMasterFileSystem().splitLog(serverName); - } else { - LOG.info("Skipping log splitting for " + serverName); - } - } catch (IOException ioe) { - //typecast to SSH so that we make sure that it is the SSH instance that - //gets submitted as opposed to MSSH or some other derived instance of SSH - this.services.getExecutorService().submit((ServerShutdownHandler)this); - this.deadServers.add(serverName); - throw new IOException("failed log splitting for " + - serverName + ", will retry", ioe); - } + // We don't want worker thread in the MetaServerShutdownHandler // executor pool to block by waiting availability of -ROOT- // and .META. server. Otherwise, it could run into the following issue: @@ -153,8 +139,7 @@ public class ServerShutdownHandler extends EventHandler { // If AssignmentManager hasn't finished rebuilding user regions, // we are not ready to assign dead regions either. So we re-queue up // the dead server for further processing too. - if (isCarryingRoot() || isCarryingMeta() // -ROOT- or .META. - || !services.getAssignmentManager().isFailoverCleanupDone()) { + if (!services.getAssignmentManager().isFailoverCleanupDone()) { this.services.getServerManager().processDeadServer(serverName); return; } @@ -193,6 +178,27 @@ public class ServerShutdownHandler extends EventHandler { throw new IOException("Server is stopped"); } + try { + if (this.shouldSplitHlog) { + LOG.info("Splitting logs for " + serverName + " before assignment."); + if(this.services.getMasterFileSystem().isInReplayWALEditsMode()){ + List serverNames = new ArrayList(); + serverNames.add(serverName); + this.services.getMasterFileSystem().startSubmitLogSplitJob(serverNames); + } else { + this.services.getMasterFileSystem().splitLog(serverName); + } + } else { + LOG.info("Skipping log splitting for " + serverName); + } + } catch (IOException ioe) { + // typecast to SSH so that we make sure that it is the SSH instance that + // gets submitted as opposed to MSSH or some other derived instance of SSH + this.services.getExecutorService().submit((ServerShutdownHandler) this); + this.deadServers.add(serverName); + throw new IOException("failed log splitting for " + serverName + ", will retry", ioe); + } + // Clean out anything in regions in transition. Being conservative and // doing after log splitting. Could do some states before -- OPENING? // OFFLINE? -- and then others after like CLOSING that depend on log @@ -277,6 +283,19 @@ public class ServerShutdownHandler extends EventHandler { } finally { this.deadServers.finish(serverName); } + + try { + if (this.shouldSplitHlog && this.services.getMasterFileSystem().isInReplayWALEditsMode()) { + this.services.getMasterFileSystem().splitLog(serverName); + } + } catch (IOException ioe) { + // typecast to SSH so that we make sure that it is the SSH instance that + // gets submitted as opposed to MSSH or some other derived instance of SSH + this.services.getExecutorService().submit((ServerShutdownHandler) this); + this.deadServers.add(serverName); + throw new IOException("failed log splitting for " + serverName + ", will retry", ioe); + } + LOG.info("Finished processing of shutdown of " + serverName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d179cad..190aaac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -134,7 +134,6 @@ import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d74db09..822c4c2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -29,14 +29,12 @@ import java.lang.reflect.Method; import java.net.BindException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -53,7 +51,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; -import com.google.protobuf.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -66,9 +63,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.FailedSanityCheckException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.OutOfOrderScannerNextException; @@ -111,6 +108,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientEngine; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -214,6 +212,7 @@ import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -273,6 +272,13 @@ public class HRegionServer implements ClientProtocol, protected final Map onlineRegions = new ConcurrentHashMap(); + /** + * Set of regions currently being in recovering state which means it can accept writes(edits from + * previous failed region server) but not reads. A recovering region is also an online region. + */ + protected final Set recoveringRegions = Collections + .synchronizedSet(new HashSet()); + // Leases protected Leases leases; @@ -427,6 +433,9 @@ public class HRegionServer implements ClientProtocol, private RegionServerCoprocessorHost rsHost; + // configuration setting on if replay WAL edits directly to another RS + private final boolean replayWALEdits; + /** * Starts a HRegionServer at the default location * @@ -517,6 +526,8 @@ public class HRegionServer implements ClientProtocol, } }; this.rsHost = new RegionServerCoprocessorHost(this, this.conf); + + this.replayWALEdits = conf.getBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); } /** @@ -1545,8 +1556,7 @@ public class HRegionServer implements ClientProtocol, this.rpcServer.start(); // Create the log splitting worker and start it - this.splitLogWorker = new SplitLogWorker(this.zooKeeper, - this.getConfiguration(), this.getServerName(), this); + this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this); splitLogWorker.start(); } @@ -1918,17 +1928,43 @@ public class HRegionServer implements ClientProtocol, } @Override - public long getLastSequenceId(byte[] region) { + public long getLastSequenceId(String regionServerName, byte[] region) { Long lastFlushedSequenceId = -1l; - try { - GetLastFlushedSequenceIdRequest req = - RequestConverter.buildGetLastFlushedSequenceIdRequest(region); - lastFlushedSequenceId = hbaseMaster.getLastFlushedSequenceId(null, req) - .getLastFlushedSequenceId(); - } catch (ServiceException e) { - lastFlushedSequenceId = -1l; - LOG.warn("Unable to connect to the master to check " + - "the last flushed sequence id", e); + + if (!this.replayWALEdits) { + try { + GetLastFlushedSequenceIdRequest req = RequestConverter + .buildGetLastFlushedSequenceIdRequest(region); + lastFlushedSequenceId = hbaseMaster.getLastFlushedSequenceId(null, req) + .getLastFlushedSequenceId(); + } catch (ServiceException e) { + lastFlushedSequenceId = -1l; + LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e); + } + } else { + if (regionServerName.isEmpty()) { + return lastFlushedSequenceId; + } + // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, + // last flushed sequence Id changes when newly assigned RS flush writes to the region. + // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed + // sequence Id name space + // (sequence Id only valid for a particular RS instance), changes when different newly + // assigned RS flushes the region. + // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of + // last flushed sequence Id for each individual RS instance. + String encodedRegionName = Bytes.toString(region); + String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, encodedRegionName); + nodePath = ZKUtil.joinZNode(nodePath, regionServerName); + try { + byte[] data = ZKUtil.getData(getZooKeeper(), nodePath); + if (data != null) { + lastFlushedSequenceId = SplitLogManager.parseHLogPositionFrom(data); + } + } catch (KeeperException e) { + LOG.warn("Cannot get lastFlushedSequenceId from ZooKeeper for server=" + regionServerName + + "; region=" + encodedRegionName, e); + } } return lastFlushedSequenceId; } @@ -2042,6 +2078,19 @@ public class HRegionServer implements ClientProtocol, this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); } + public void addToRecoveringRegions(String regionEncodeName) { + this.recoveringRegions.add(regionEncodeName); + } + + public void removeFromRecoveringRegions(String regionEncodeName) { + this.recoveringRegions.remove(regionEncodeName); + } + + @Override + public Set getRecoveringRegions() { + return this.recoveringRegions; + } + /** * @return A new Map of online regions sorted by region size with the first * entry being the biggest. @@ -2691,6 +2740,14 @@ public class HRegionServer implements ClientProtocol, ClientProtos.Get get = request.getGet(); Boolean existence = null; Result r = null; + + // check if current region is in recovering phase + if ((this.recoveringRegions.size() > 0) + && this.recoveringRegions.contains(region.getRegionInfo().getEncodedName())) { + throw new NotServingRegionException(region.getRegionInfo().getRegionNameAsString() + + " is recovering"); + } + if (request.getClosestRowBefore()) { if (get.getColumnCount() != 1) { throw new DoNotRetryIOException( @@ -3024,6 +3081,13 @@ public class HRegionServer implements ClientProtocol, } if (!done) { + // check if current region is in recovering phase + if ((this.recoveringRegions.size() > 0) + && this.recoveringRegions.contains(region.getRegionInfo().getEncodedName())) { + throw new NotServingRegionException(region.getRegionInfo().getRegionNameAsString() + + " is recovering"); + } + long maxResultSize = scanner.getMaxResultSize(); if (maxResultSize <= 0) { maxResultSize = maxScannerResultSize; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java index 9c5aac6..15e9907 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java @@ -26,8 +26,9 @@ import org.apache.hadoop.classification.InterfaceAudience; @InterfaceAudience.Private public interface LastSequenceId { /** + * @param regionServerName * @param regionname * @return Last flushed sequence Id for regionname */ - public long getLastSequenceId(byte[] regionname); + public long getLastSequenceId(String regionServerName, byte[] regionname); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index e40871f..07d4703 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; @@ -92,4 +93,10 @@ public interface RegionServerServices extends OnlineRegions { * @return The RegionServer's "Leases" service */ public Leases getLeases(); + + /** + * @return The RegionServer's "recovering" regions + */ + public Set getRecoveringRegions(); + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 9c7dc58..943e027 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -69,6 +71,7 @@ import org.apache.zookeeper.data.Stat; @InterfaceAudience.Private public class SplitLogWorker extends ZooKeeperListener implements Runnable { private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); + private static final int checkInterval = 10000; // 10 seconds Thread worker; private final ServerName serverName; @@ -81,18 +84,26 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { private volatile boolean exitWorker; private final Object grabTaskLock = new Object(); private boolean workerInGrabTask = false; - + private HRegionServer server = null; public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, - ServerName serverName, TaskExecutor splitTaskExecutor) { + HRegionServer server, TaskExecutor splitTaskExecutor) { + super(watcher); + this.server = server; + this.serverName = server.getServerName(); + this.splitTaskExecutor = splitTaskExecutor; + } + + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName, + TaskExecutor splitTaskExecutor) { super(watcher); this.serverName = serverName; this.splitTaskExecutor = splitTaskExecutor; } - public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, - final ServerName serverName, final LastSequenceId sequenceIdChecker) { - this(watcher, conf, serverName, new TaskExecutor () { + public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, HRegionServer server, + final LastSequenceId sequenceIdChecker) { + this(watcher, conf, server, new TaskExecutor() { @Override public Status exec(String filename, CancelableProgressable p) { Path rootdir; @@ -200,7 +211,36 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { synchronized (taskReadyLock) { while (seq_start == taskReadySeq) { try { - taskReadyLock.wait(); + taskReadyLock.wait(checkInterval); + if (paths.isEmpty() && this.server != null) { + // check to see if we have stale recovering regions in our internal memory state + Set recoveringRegions = this.server.getRecoveringRegions(); + if (recoveringRegions.size() > 0) { + // Make a local copy to prevent ConcurrentModificationException when other threads + // modify recoveringRegions + List tmpCopy = new ArrayList(recoveringRegions); + for (String region : tmpCopy) { + String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region); + try { + if (ZKUtil.checkExists(this.watcher, nodePath) == -1) { + recoveringRegions.remove(region); + LOG.debug("Mark recovering region " + region + + " up from SplitLogWorker thread."); + } else { + // current check is a defensive(or redundant) mechanism to prevent us from + // having stale recovering regions in our internal RS memory state while + // zookeeper(source of truth) says differently. We stop at the first good one + // because we should not have a single instance such as this in normal case so + // check the first one is good enough. + break; + } + } catch (KeeperException e) { + // ignore zookeeper error + break; + } + } + } + } } catch (InterruptedException e) { LOG.info("SplitLogWorker interrupted while waiting for task," + " exiting: " + e.toString() + (exitWorker ? "" : @@ -210,6 +250,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } } } + } } @@ -452,9 +493,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } } - - - @Override public void nodeDataChanged(String path) { // there will be a self generated dataChanged event every time attemptToOwnTask() @@ -499,7 +537,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { return childrenPaths; } - @Override public void nodeChildrenChanged(String path) { if(path.equals(watcher.splitLogZNode)) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 1f361a0..acb708d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -31,9 +31,11 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; /** @@ -57,6 +59,10 @@ public class OpenRegionHandler extends EventHandler { //version of the offline node that was set by the master private volatile int versionOfOfflineNode = -1; + // Watch if a region is out of recovering state from ZooKeeper + @SuppressWarnings("unused") + private RecoveringRegionWatcher recoveringRegionWatcher; + public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, HTableDescriptor htd) { @@ -78,12 +84,52 @@ public class OpenRegionHandler extends EventHandler { this.regionInfo = regionInfo; this.htd = htd; this.versionOfOfflineNode = versionOfOfflineNode; + this.recoveringRegionWatcher = new RecoveringRegionWatcher(server.getZooKeeper()); } public HRegionInfo getRegionInfo() { return regionInfo; } + /** + * Watcher used to be notified of the recovering region comes out of recovering state + */ + class RecoveringRegionWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public RecoveringRegionWatcher(ZooKeeperWatcher watcher) { + super(watcher); + watcher.registerListener(this); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + if (server.isStopped() || rsServices.isStopping()) { + return; + } + + final String encodedName = regionInfo.getEncodedName(); + if (!rsServices.getRecoveringRegions().contains(encodedName)) { + return; + } + + String nodePath = ZKUtil.joinZNode(server.getZooKeeper().recoveringRegionsZNode, encodedName); + if (!path.equalsIgnoreCase(nodePath)) { + return; + } + + rsServices.getRecoveringRegions().remove(encodedName); + + LOG.info(path + " znode deleted. Region: " + regionInfo.getRegionNameAsString() + + " completes recovering."); + } + } + @Override public void process() throws IOException { boolean openSuccessful = false; @@ -134,6 +180,18 @@ public class OpenRegionHandler extends EventHandler { transitionToFailedOpen = true; return; } + + // check if current region is marked as recovering in ZooKeeper + try { + if (isRegionMarkedRecoveringInZK(encodedName)) { + this.rsServices.getRecoveringRegions().add(encodedName); + } + } catch (KeeperException e) { + LOG.error("Can't retrieve recovering state from zookeeper", e); + tryTransitionFromOpeningToFailedOpen(regionInfo); + return; + } + boolean failed = true; if (tickleOpening("post_region_open")) { if (updateMeta(region)) { @@ -533,6 +591,38 @@ public class OpenRegionHandler extends EventHandler { return b; } + /** + * check if /hbase/recovering-regions/ exists. Returns true if exists + * and set watcher as well. + * @param regionEncodedName region encode name + * @return true when /hbase/recovering-regions/ exists + * @throws KeeperException + */ + boolean isRegionMarkedRecoveringInZK(String regionEncodedName) throws KeeperException { + boolean result = false; + String encodedName = this.regionInfo.getEncodedName(); + String nodePath = ZKUtil.joinZNode(this.server.getZooKeeper().recoveringRegionsZNode, + encodedName); + long zkretries = this.server.getConfiguration().getLong("hbase.splitlog.zk.retries", 3); + + do { + try { + byte[] node = ZKUtil.getDataAndWatch(server.getZooKeeper(), nodePath); + if (node != null) { + result = true; + } + break; + } catch (KeeperException e) { + if (zkretries <= 1) { + // all retries failed + throw e; + } + } + } while ((--zkretries) > 0); + + return result; + } + private boolean isGoodVersion() { return this.version != -1; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 8d500f2..3a95624 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -34,9 +34,12 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,10 +52,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -118,6 +127,18 @@ public class HLogSplitter { // For checking the latest flushed sequence id protected final LastSequenceId sequenceIdChecker; + final boolean replayWALEdits; + + private volatile boolean isDoneReading = false; + + private final ExecutorService sharedThreadPool; + + // Number of writer threads + private final int numWriterThreads; + + // Min batch size when replay WAL edits + private final int minBatchSize; + /** * Create a new HLogSplitter using the given {@link Configuration} and the * hbase.hlog.splitter.impl property to derived the instance @@ -174,7 +195,22 @@ public class HLogSplitter { entryBuffers = new EntryBuffers( conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); - outputSink = new OutputSink(); + + this.minBatchSize = conf.getInt("hbase.regionserver.hlog.splitlog.writer.min.batch.size", 512); + this.replayWALEdits = conf.getBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); + this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + if (this.replayWALEdits) { + this.sharedThreadPool = new ThreadPoolExecutor(this.numWriterThreads, conf.getInt( + "hbase.htable.threads.max", Integer.MAX_VALUE), conf.getLong( + "hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, + new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-recover")); + ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true); + + outputSink = new OutputRegionServerSink(numWriterThreads, this.sharedThreadPool); + } else { + outputSink = new OutputEditsFilesSink(numWriterThreads); + this.sharedThreadPool = null; + } } /** @@ -428,20 +464,23 @@ public class HLogSplitter { outputSink.startWriterThreads(); // Report progress every so many edits and/or files opened (opening a file // takes a bit of time). - Map lastFlushedSequenceIds = - new TreeMap(Bytes.BYTES_COMPARATOR); + Map lastFlushedSequenceIds = new TreeMap(); Entry entry; int editsCount = 0; int editsSkipped = 0; + Long lastFlushedSequenceId = -1L; + ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath); + String serverNameStr = (serverName == null) ? "" : serverName.getServerName(); try { - while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { + isDoneReading = false; + while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); - Long lastFlushedSequenceId = -1l; + String key = serverName + ":" + Bytes.toString(region); if (sequenceIdChecker != null) { - lastFlushedSequenceId = lastFlushedSequenceIds.get(region); + lastFlushedSequenceId = lastFlushedSequenceIds.get(key); if (lastFlushedSequenceId == null) { - lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); - lastFlushedSequenceIds.put(region, lastFlushedSequenceId); + lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(serverNameStr, region); + lastFlushedSequenceIds.put(key, lastFlushedSequenceId); } } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { @@ -452,10 +491,10 @@ public class HLogSplitter { editsCount++; // If sufficient edits have passed, check if we should report progress. if (editsCount % interval == 0 - || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) { - numOpenedFilesLastCheck = outputSink.logWriters.size(); - String countsStr = (editsCount - editsSkipped) + - " edits, skipped " + editsSkipped + " edits."; + || (outputSink.getNumOpenWriters() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) { + numOpenedFilesLastCheck = outputSink.getNumOpenWriters(); + String countsStr = (editsCount - editsSkipped) + " edits, skipped " + editsSkipped + + " edits."; status.setStatus("Split " + countsStr); if (!reportProgressIfIsDistributedLogSplitting()) { return false; @@ -474,12 +513,12 @@ public class HLogSplitter { e = RemoteExceptionHandler.checkIOException(e); throw e; } finally { + isDoneReading = true; LOG.info("Finishing writing output logs and closing down."); progress_failed = outputSink.finishWritingAndClose() == null; String msg = "Processed " + editsCount + " edits across " - + outputSink.getOutputCounts().size() + " regions; log file=" - + logPath + " is corrupted = " + isCorrupted + " progress failed = " - + progress_failed; + + outputSink.recoveredRegions().size() + " regions; log file=" + logPath + + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed; ; LOG.info(msg); status.markComplete(msg); @@ -850,20 +889,28 @@ public class HLogSplitter { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); - dataAvailable.wait(3000); + dataAvailable.wait(2000); } dataAvailable.notifyAll(); } checkForErrors(); } + /* + * Backward compatible with existing code + */ synchronized RegionEntryBuffer getChunkToWrite() { - long biggestSize=0; - byte[] biggestBufferKey=null; + return getChunkToWrite(false); + } + + synchronized RegionEntryBuffer getChunkToWrite(boolean ignoreCurrentlyWritingCheck) { + long biggestSize = 0; + byte[] biggestBufferKey = null; for (Map.Entry entry : buffers.entrySet()) { long size = entry.getValue().heapSize(); - if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) { + if (size > biggestSize + && (ignoreCurrentlyWritingCheck || !currentlyWriting.contains(entry.getKey()))) { biggestSize = size; biggestBufferKey = entry.getKey(); } @@ -879,8 +926,7 @@ public class HLogSplitter { void doneWriting(RegionEntryBuffer buffer) { synchronized (this) { - boolean removed = currentlyWriting.remove(buffer.encodedRegionName); - assert removed; + currentlyWriting.remove(buffer.encodedRegionName); } long size = buffer.heapSize(); @@ -955,13 +1001,16 @@ public class HLogSplitter { private void doRun() throws IOException { LOG.debug("Writer thread " + this + ": starting"); while (true) { - RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); + RegionEntryBuffer buffer = null; + buffer = entryBuffers.getChunkToWrite(replayWALEdits); if (buffer == null) { // No data currently available, wait on some more to show up synchronized (dataAvailable) { - if (shouldStop) return; + if (shouldStop && !outputSink.flush()) { + return; + } try { - dataAvailable.wait(1000); + dataAvailable.wait(500); } catch (InterruptedException ie) { if (!shouldStop) { throw new RuntimeException(ie); @@ -982,39 +1031,7 @@ public class HLogSplitter { private void writeBuffer(RegionEntryBuffer buffer) throws IOException { - List entries = buffer.entryBuffer; - if (entries.isEmpty()) { - LOG.warn(this.getName() + " got an empty buffer, skipping"); - return; - } - - WriterAndPath wap = null; - - long startTime = System.nanoTime(); - try { - int editsCount = 0; - - for (Entry logEntry : entries) { - if (wap == null) { - wap = outputSink.getWriterAndPath(logEntry); - if (wap == null) { - // getWriterAndPath decided we don't need to write these edits - // Message was already logged - return; - } - } - wap.w.append(logEntry); - outputSink.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; - } - // Pass along summary statistics - wap.incrementEdits(editsCount); - wap.incrementNanoTime(System.nanoTime() - startTime); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal(this.getName() + " Got while writing log entry to log", e); - throw e; - } + outputSink.append(buffer); } void finish() { @@ -1025,28 +1042,6 @@ public class HLogSplitter { } } - private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, - FileSystem fs, Configuration conf) - throws IOException { - Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); - if (regionedits == null) { - return null; - } - if (fs.exists(regionedits)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " - + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); - } - } - Writer w = createWriter(fs, regionedits, conf); - LOG.debug("Creating writer path=" + regionedits + " region=" - + Bytes.toStringBinary(region)); - return (new WriterAndPath(regionedits, w)); - } - Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) { List components = new ArrayList(10); do { @@ -1129,38 +1124,35 @@ public class HLogSplitter { } /** - * Class that manages the output streams from the log splitting process. + * The following class is an abstraction class to provide a common interface to support both + * existing recovered edits file sink and directly region server WAL edits replay */ - class OutputSink { - private final Map logWriters = Collections.synchronizedMap( - new TreeMap(Bytes.BYTES_COMPARATOR)); - private final Map regionMaximumEditLogSeqNum = Collections + abstract class OutputSink { + + protected Map writers = Collections + .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));; + + protected final Map regionMaximumEditLogSeqNum = Collections .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); - private final List writerThreads = Lists.newArrayList(); + + protected final List writerThreads = Lists.newArrayList(); /* Set of regions which we've decided should not output edits */ - private final Set blacklistedRegions = Collections.synchronizedSet( - new TreeSet(Bytes.BYTES_COMPARATOR)); + protected final Set blacklistedRegions = Collections + .synchronizedSet(new TreeSet(Bytes.BYTES_COMPARATOR)); - private boolean closeAndCleanCompleted = false; - - private boolean logWritersClosed = false; + protected boolean closeAndCleanCompleted = false; - private final int numThreads; + protected boolean writersClosed = false; - public OutputSink() { - // More threads could potentially write faster at the expense - // of causing more disk seeks as the logs are split. - // 3. After a certain setting (probably around 3) the - // process will be bound on the reader in the current - // implementation anyway. - numThreads = conf.getInt( - "hbase.regionserver.hlog.splitlog.writer.threads", 3); + protected final int numThreads; + + public OutputSink(int numWriters) { + numThreads = numWriters; } /** - * Start the threads that will pump data from the entryBuffers - * to the output files. + * Start the threads that will pump data from the entryBuffers to the output files. */ synchronized void startWriterThreads() { for (int i = 0; i < numThreads; i++) { @@ -1171,67 +1163,144 @@ public class HLogSplitter { } /** - * - * @return null if failed to report progress + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(Entry entry) { + synchronized (regionMaximumEditLogSeqNum) { + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() + .getEncodedRegionName()); + if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) { + regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() + .getLogSeqNum()); + } + } + + } + + Long getRegionMaximumEditLogSeqNum(byte[] region) { + return regionMaximumEditLogSeqNum.get(region); + } + + /** + * @return the number of currently opened writers + */ + int getNumOpenWriters() { + return this.writers.size(); + } + + /** + * Wait for writer threads to dump all info to the sink + * @return true when there is no error * @throws IOException */ - List finishWritingAndClose() throws IOException { + protected boolean waitForFinishWriting() throws IOException { LOG.info("Waiting for split writer threads to finish"); boolean progress_failed = false; - try { - for (WriterThread t : writerThreads) { - t.finish(); - } - for (WriterThread t : writerThreads) { - if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { - progress_failed = true; - } - try { - t.join(); - } catch (InterruptedException ie) { - IOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } - checkForErrors(); + for (WriterThread t : writerThreads) { + t.finish(); + } + for (WriterThread t : writerThreads) { + if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { + progress_failed = true; } - LOG.info("Split writers finished"); - if (progress_failed) { - return null; + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; } - return closeStreams(); + checkForErrors(); + } + LOG.info("Split writers finished"); + return (progress_failed == false); + } + + abstract List finishWritingAndClose() throws IOException; + + /** + * @return a map from encoded region ID to the number of edits written out for that region. + */ + abstract Map getOutputCounts(); + + /** + * @return a list of regions we've recovered + */ + abstract Set recoveredRegions(); + + /** + * @param entry A WAL Edit Entry + * @throws IOException + */ + abstract void append(RegionEntryBuffer buffer) throws IOException; + + /** + * WriterThread call this function to help flush internal queues + * @return true when underlying sink has something to flush + */ + protected boolean flush() throws IOException { + return false; + } + } + + /** + * Class that manages the output streams from the log splitting process. + */ + class OutputEditsFilesSink extends OutputSink { + + public OutputEditsFilesSink(int numWriters) { + // More threads could potentially write faster at the expense + // of causing more disk seeks as the logs are split. + // 3. After a certain setting (probably around 3) the + // process will be bound on the reader in the current + // implementation anyway. + super(numWriters); + } + + /** + * @return null if failed to report progress + * @throws IOException + */ + @Override + List finishWritingAndClose() throws IOException { + boolean isSuccessful = false; + List result = null; + try { + isSuccessful = waitForFinishWriting(); } finally { + result = close(); List thrown = closeLogWriters(null); if (thrown != null && !thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } } + return (isSuccessful) ? result : null; } /** * Close all of the output streams. * @return the list of paths written. */ - private List closeStreams() throws IOException { + private List close() throws IOException { Preconditions.checkState(!closeAndCleanCompleted); final List paths = new ArrayList(); final List thrown = Lists.newArrayList(); - ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool( - numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { - private int count = 1; - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "split-log-closeStream-" + count++); - return t; - } - }); + ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, + TimeUnit.SECONDS, new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "split-log-closeStream-" + count++); + return t; + } + }); CompletionService completionService = new ExecutorCompletionService( closeThreadPool); - for (final Map.Entry logWritersEntry : logWriters - .entrySet()) { + for (final Map.Entry writersEntry : writers.entrySet()) { completionService.submit(new Callable() { public Void call() throws Exception { - WriterAndPath wap = logWritersEntry.getValue(); + WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); try { wap.w.close(); } catch (IOException ioe) { @@ -1239,15 +1308,25 @@ public class HLogSplitter { thrown.add(ioe); return null; } - LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten - + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); + LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms)"); + + if (wap.editsWritten == 0) { + // just remove the empty recovered.edits file + if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + LOG.warn("Failed deleting empty " + wap.p); + throw new IOException("Failed deleting empty " + wap.p); + } + return null; + } + Path dst = getCompletedRecoveredEditsFilePath(wap.p, - regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); + regionMaximumEditLogSeqNum.get(writersEntry.getKey())); try { if (!dst.equals(wap.p) && fs.exists(dst)) { LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " - + dst + ", length=" + fs.getFileStatus(dst).getLen()); + + "result of a previous failed split attempt. Deleting " + dst + ", length=" + + fs.getFileStatus(dst).getLen()); if (!fs.delete(dst, false)) { LOG.warn("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst); @@ -1258,8 +1337,7 @@ public class HLogSplitter { // TestHLogSplit#testThreading is an example. if (fs.exists(wap.p)) { if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " - + dst); + throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.debug("Rename " + wap.p + " to " + dst); } @@ -1276,7 +1354,7 @@ public class HLogSplitter { boolean progress_failed = false; try { - for (int i = 0, n = logWriters.size(); i < n; i++) { + for (int i = 0, n = this.writers.size(); i < n; i++) { Future future = completionService.take(); future.get(); if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { @@ -1296,17 +1374,16 @@ public class HLogSplitter { if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } - logWritersClosed = true; + writersClosed = true; closeAndCleanCompleted = true; if (progress_failed) { return null; } return paths; } - - private List closeLogWriters(List thrown) - throws IOException { - if (!logWritersClosed) { + + private List closeLogWriters(List thrown) throws IOException { + if (!writersClosed) { if (thrown == null) { thrown = Lists.newArrayList(); } @@ -1325,36 +1402,35 @@ public class HLogSplitter { } } } finally { - synchronized (logWriters) { - for (WriterAndPath wap : logWriters.values()) { + synchronized (writers) { + WriterAndPath wap = null; + for (SinkWriter tmpWAP : writers.values()) { try { + wap = (WriterAndPath) tmpWAP; wap.w.close(); } catch (IOException ioe) { LOG.error("Couldn't close log at " + wap.p, ioe); thrown.add(ioe); continue; } - LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten - + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); + LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms)"); } } - logWritersClosed = true; + writersClosed = true; } } return thrown; } /** - * Get a writer and path for a log starting at the given entry. - * - * This function is threadsafe so long as multiple threads are always - * acting on different regions. - * + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. * @return null if this region shouldn't output any logs */ - WriterAndPath getWriterAndPath(Entry entry) throws IOException { + private WriterAndPath getWriterAndPath(Entry entry) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); - WriterAndPath ret = logWriters.get(region); + WriterAndPath ret = (WriterAndPath) writers.get(region); if (ret != null) { return ret; } @@ -1368,77 +1444,455 @@ public class HLogSplitter { blacklistedRegions.add(region); return null; } - logWriters.put(region, ret); + writers.put(region, ret); return ret; } - /** - * Update region's maximum edit log SeqNum. - */ - void updateRegionMaximumEditLogSeqNum(Entry entry) { - synchronized (regionMaximumEditLogSeqNum) { - Long currentMaxSeqNum=regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName()); - if (currentMaxSeqNum == null - || entry.getKey().getLogSeqNum() > currentMaxSeqNum) { - regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), - entry.getKey().getLogSeqNum()); + private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs, + Configuration conf) throws IOException { + Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); + if (regionedits == null) { + return null; + } + if (fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); } } - + Writer w = createWriter(fs, regionedits, conf); + LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); + return (new WriterAndPath(regionedits, w)); } - Long getRegionMaximumEditLogSeqNum(byte[] region) { - return regionMaximumEditLogSeqNum.get(region); + void append(RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + + WriterAndPath wap = null; + + long startTime = System.nanoTime(); + try { + int editsCount = 0; + + for (Entry logEntry : entries) { + if (wap == null) { + wap = getWriterAndPath(logEntry); + if (wap == null) { + // getWriterAndPath decided we don't need to write these edits + // Message was already logged + return; + } + } + wap.w.append(logEntry); + outputSink.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; + } + // Pass along summary statistics + wap.incrementEdits(editsCount); + wap.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(" Got while writing log entry to log", e); + throw e; + } } /** - * @return a map from encoded region ID to the number of edits written out - * for that region. + * @return a map from encoded region ID to the number of edits written out for that region. */ - private Map getOutputCounts() { - TreeMap ret = new TreeMap( - Bytes.BYTES_COMPARATOR); - synchronized (logWriters) { - for (Map.Entry entry : logWriters.entrySet()) { + Map getOutputCounts() { + TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); + synchronized (writers) { + for (Map.Entry entry : writers.entrySet()) { ret.put(entry.getKey(), entry.getValue().editsWritten); } } return ret; } + + Set recoveredRegions() { + return writers.keySet(); + } } + private abstract static class SinkWriter { + /* Count of edits written to this path */ + long editsWritten = 0; + /* Number of nanos spent writing to this log */ + long nanosSpent = 0; + + void incrementEdits(int edits) { + editsWritten += edits; + } + void incrementNanoTime(long nanos) { + nanosSpent += nanos; + } + } /** - * Private data structure that wraps a Writer and its Path, - * also collecting statistics about the data written to this - * output. + * Private data structure that wraps a Writer and its Path, also collecting statistics about the + * data written to this output. */ - private final static class WriterAndPath { + private final static class WriterAndPath extends SinkWriter { final Path p; final Writer w; - /* Count of edits written to this path */ - long editsWritten = 0; - /* Number of nanos spent writing to this log */ - long nanosSpent = 0; - WriterAndPath(final Path p, final Writer w) { this.p = p; this.w = w; } + } - void incrementEdits(int edits) { - editsWritten += edits; + /** + * Class that manages the output streams from the log splitting process. + */ + class OutputRegionServerSink extends OutputSink { + private static final double BUFFER_THRESHOLD = 0.35; + private static final String KEY_DELIMITER = "#"; + + private final Set recoveredRegions = Collections.synchronizedSet(new TreeSet( + Bytes.BYTES_COMPARATOR)); + private final Map writers = new ConcurrentHashMap(); + + private final ExecutorService sharedThreadPool; + + private Map tableNameToHTableMap; + /** + * Map key -> value layout : + * + * -> Queue + */ + private Map> serverToBufferQueueMap; + private List thrown; + + public OutputRegionServerSink(int numWriters, ExecutorService pool) { + super(numWriters); + this.sharedThreadPool = pool; + this.tableNameToHTableMap = Collections.synchronizedMap(new TreeMap( + Bytes.BYTES_COMPARATOR)); + this.serverToBufferQueueMap = new ConcurrentHashMap>(); + this.thrown = new ArrayList(); } - void incrementNanoTime(long nanos) { - nanosSpent += nanos; + void append(RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + + // store the name of a region we're recovering + if (!recoveredRegions.contains(buffer.encodedRegionName)) { + recoveredRegions.add(buffer.encodedRegionName); + } + + // group entries by region servers + for (HLog.Entry entry : entries) { + WALEdit edit = entry.getEdit(); + byte[] table = entry.getKey().getTablename(); + HTable htable = this.getHTable(table); + + Put put = null; + Delete del = null; + KeyValue lastKV = null; + HRegionLocation loc = null; + Row preRow = null; + Row lastAddedRow = null; // it is not really needed here just be conservative + String preKey = null; + List kvs = edit.getKeyValues(); + + for (KeyValue kv : kvs) { + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + if (preRow != null) { + synchronized (serverToBufferQueueMap) { + List queue = serverToBufferQueueMap.get(preKey); + if (queue == null) { + queue = Collections.synchronizedList(new ArrayList()); + serverToBufferQueueMap.put(preKey, queue); + } + queue.add(preRow); + lastAddedRow = preRow; + } + } + loc = htable.getRegionLocation(kv.getRow(), false); + if (kv.isDelete()) { + del = new Delete(kv.getRow()); + del.setClusterId(entry.getKey().getClusterId()); + preRow = del; + } else { + put = new Put(kv.getRow()); + put.setClusterId(entry.getKey().getClusterId()); + preRow = put; + } + if (loc == null) { + throw new IOException("Can't locate location for row:" + Bytes.toString(kv.getRow()) + + " of table:" + Bytes.toString(table)); + } else { + preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table); + } + } + if (kv.isDelete()) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; + } + + // add the last row + if (preRow != null && lastAddedRow != preRow) { + synchronized (serverToBufferQueueMap) { + List queue = serverToBufferQueueMap.get(preKey); + if (queue == null) { + queue = Collections.synchronizedList(new ArrayList()); + serverToBufferQueueMap.put(preKey, queue); + } + queue.add(preRow); + } + } + } + + // process workitems + String maxLocKey = null; + int maxSize = 0; + List maxQueue = null; + synchronized (this.serverToBufferQueueMap) { + for (String key : this.serverToBufferQueueMap.keySet()) { + List curQueue = this.serverToBufferQueueMap.get(key); + if (curQueue.size() > maxSize) { + maxSize = curQueue.size(); + maxQueue = curQueue; + maxLocKey = key; + } + } + if (!isDoneReading && maxSize < minBatchSize + && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) { + // buffer more to process + return; + } else if (maxSize > 0) { + this.serverToBufferQueueMap.remove(maxLocKey); + } + } + + if (maxSize > 0) { + processWorkItems(maxLocKey, maxQueue); + } + } + + private void processWorkItems(String key, List actions) throws IOException { + RegionServerWriter rsw = null; + + long startTime = System.nanoTime(); + try { + rsw = getRegionServerWriter(key); + rsw.sink.replayEntries(actions); + + // Pass along summary statistics + rsw.incrementEdits(actions.size()); + rsw.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(" Got while writing log entry to log", e); + throw e; + } + } + + @Override + protected boolean flush() throws IOException { + String maxLoc = null; + int maxSize = 0; + List maxQueue = null; + synchronized (this.serverToBufferQueueMap) { + for (String locationKey : this.serverToBufferQueueMap.keySet()) { + List curQueue = this.serverToBufferQueueMap.get(locationKey); + if (curQueue.size() > maxSize) { + maxSize = curQueue.size(); + maxQueue = curQueue; + maxLoc = locationKey; + } + } + if (maxSize > 0) { + this.serverToBufferQueueMap.remove(maxLoc); + } + } + + if (maxSize > 0) { + this.processWorkItems(maxLoc, maxQueue); + dataAvailable.notifyAll(); + return true; + } + return false; + } + + public void addWriterError(Throwable t) { + thrown.add(t); + } + + @Override + List finishWritingAndClose() throws IOException { + List result = new ArrayList(); + try { + if (!waitForFinishWriting()) { + return null; + } + // returns an empty array in order to keep interface same as old way + return result; + } finally { + List thrown = closeRegionServerWriters(); + if (thrown != null && !thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + } + } + + private List closeRegionServerWriters() throws IOException { + List result = null; + if (!writersClosed) { + result = Lists.newArrayList(); + try { + for (WriterThread t : writerThreads) { + while (t.isAlive()) { + t.shouldStop = true; + t.interrupt(); + try { + t.join(10); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + } finally { + synchronized (writers) { + for (String locationKey : writers.keySet()) { + RegionServerWriter tmpW = writers.get(locationKey); + try { + tmpW.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close writer for region server:" + locationKey, ioe); + result.add(ioe); + } + } + } + + // close tables + synchronized (this.tableNameToHTableMap) { + for (byte[] tableName : this.tableNameToHTableMap.keySet()) { + HTable htable = this.tableNameToHTableMap.get(tableName); + try { + htable.close(); + } catch (IOException ioe) { + result.add(ioe); + } + } + } + this.sharedThreadPool.shutdown(); + writersClosed = true; + } + } + return result; + } + + Map getOutputCounts() { + TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); + synchronized (writers) { + for (Map.Entry entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); + } + } + return ret; + } + + Set recoveredRegions() { + return this.recoveredRegions; + } + + int getNumOpenWriters() { + return writers.size(); + } + + private String getTableFromLocationStr(String loc) { + /** + * location key is in fomrat # + *
+ */ + String[] splits = loc.split(KEY_DELIMITER); + if (splits.length != 2) { + return ""; + } + return splits[1]; + } + + /** + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. + * @return null if this region shouldn't output any logs + */ + private RegionServerWriter getRegionServerWriter(String loc) throws IOException { + RegionServerWriter ret = writers.get(loc); + if (ret != null) { + return ret; + } + + String tableName = getTableFromLocationStr(loc); + if (tableName.isEmpty()) { + throw new IOException("Found invalid location string:" + loc); + } + HTable table = this.tableNameToHTableMap.get(Bytes.toBytes(tableName)); + + synchronized (writers) { + ret = writers.get(loc); + if (ret == null) { + ret = new RegionServerWriter(conf, table); + writers.put(loc, ret); + } + } + return ret; + } + + private HTable getHTable(final byte[] table) throws IOException { + HTable htable = this.tableNameToHTableMap.get(table); + if (htable == null) { + synchronized (this.tableNameToHTableMap) { + htable = this.tableNameToHTableMap.get(table); + if (htable == null) { + htable = new HTable(conf, table, this.sharedThreadPool); + // htable.setRegionCachePrefetch(table, true); + htable.setAutoFlush(false, true); + this.tableNameToHTableMap.put(table, htable); + } + } + } + return htable; + } + } + + /** + * Private data structure that wraps a receiving RS and collecting statistics about the data + * written to this newly assigned RS. + */ + private final static class RegionServerWriter extends SinkWriter { + final ReplaySink sink; + + RegionServerWriter(final Configuration conf, final HTable table) throws IOException { + this.sink = new ReplaySink(conf, table); + } + + void close() throws IOException { } } static class CorruptedLogFileException extends Exception { private static final long serialVersionUID = 1L; + CorruptedLogFileException(String s) { super(s); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 4bb546f..5b72e0b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -31,17 +31,14 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; -import org.apache.hadoop.hbase.util.FSUtils; - import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; public class HLogUtil { static final Log LOG = LogFactory.getLog(HLogUtil.class); @@ -242,6 +239,63 @@ public class HLogUtil { } /** + * This function returns region server name from a log file name which is in either format: + * hdfs:///hbase/.logs/-splitting/... or hdfs:///hbase/.logs//... + * @param logFile + * @return null if the passed in logFile isn't a valid HLog file path + */ + public static ServerName getServerNameFromHLogDirectoryName(Path logFile) { + Path logDir = logFile.getParent(); + String logDirName = logDir.getName(); + if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) { + logDir = logFile; + logDirName = logDir.getName(); + } + ServerName serverName = null; + if (logDirName.endsWith(HLog.SPLITTING_EXT)) { + logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length()); + } + try { + serverName = ServerName.parseServerName(logDirName); + } catch (IllegalArgumentException ex) { + serverName = null; + LOG.warn("Invalid log file path=" + logFile, ex); + } + if (serverName != null && serverName.getStartcode() < 0) { + LOG.warn("Invalid log file path=" + logFile); + return null; + } + return serverName; + } + + /** + * Return regions (memstores) that have edits that are equal or less than the + * passed oldestWALseqid. + * + * @param oldestWALseqid + * @param regionsToSeqids + * Encoded region names to sequence ids + * @return All regions whose seqid is < than oldestWALseqid (Not + * necessarily in order). Null if no regions found. + */ + static byte[][] findMemstoresWithEditsEqualOrOlderThan( + final long oldestWALseqid, final Map regionsToSeqids) { + // This method is static so it can be unit tested the easier. + List regions = null; + for (Map.Entry e : regionsToSeqids.entrySet()) { + if (e.getValue().longValue() <= oldestWALseqid) { + if (regions == null) + regions = new ArrayList(); + // Key is encoded region name. + regions.add(e.getKey()); + } + } + return regions == null ? null : regions + .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); + } + + /** * Returns sorted set of edit files made by wal-log splitter, excluding files * with '.temp' suffix. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsReplay.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsReplay.java new file mode 100644 index 0000000..5015b90 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsReplay.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * Class used to push numbers about the WAL into the metrics subsystem. This will take a single + * function call and turn it into multiple manipulations of the hadoop metrics system. + */ +@InterfaceAudience.Private +public class MetricsReplay { + static final Log LOG = LogFactory.getLog(MetricsReplay.class); + + private final MetricsWALSource source; + + public MetricsReplay() { + source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); + } + + public void finishSync(long time) { + source.incrementSyncTime(time); + } + + public void finishAppend(long time, long size) { + + source.incrementAppendCount(); + source.incrementAppendTime(time); + source.incrementAppendSize(size); + + if (time > 1000) { + source.incrementSlowAppendCount(); + LOG.warn(String.format("%s took %d ms appending an edit to hlog; len~=%s", Thread + .currentThread().getName(), time, StringUtils.humanReadableInt(size))); + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplaySink.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplaySink.java new file mode 100644 index 0000000..b935be8 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplaySink.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * This class is responsible for replaying the edits coming from a failed region server. + *

+ * This class uses the native HBase client in order to replay WAL entries. + *

+ */ +@InterfaceAudience.Private +public class ReplaySink { + + private static final Log LOG = LogFactory.getLog(ReplaySink.class); + + private final Configuration conf; + private final HTable table; + private final MetricsReplay metrics; + private final AtomicLong totalReplayedEdits = new AtomicLong(); + + /** + * Create a sink for WAL log entries replay + * @param conf conf object + * @param table a HTable instance managed by caller + * @throws IOException thrown when HDFS goes bad or bad file name + */ + public ReplaySink(Configuration conf, HTable table) + throws IOException { + this.conf = HBaseConfiguration.create(conf); + this.metrics = new MetricsReplay(); + this.table = table; + } + + /** + * Replay an array of entries of a same region directly into the newly assigned Region Server + * using the native client. + * @param entries + * @throws IOException + */ + public void replayEntries(List actions) throws IOException { + if (actions.size() == 0) { + return; + } + + try { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + table.batch(actions); + long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; + LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime + + "(ms)!"); + /** + * TODO: Add more metricis. + */ + this.totalReplayedEdits.addAndGet(actions.size()); + } catch (InterruptedException ix) { + throw new IOException(ix); + } catch (IOException ex) { + LOG.error("Unable to accept edit because:", ex); + throw ex; + } + } + + /** + * Get a string representation of this sink's metrics + * @return string with the total replicated edits count and the date of the last edit that was + * applied + */ + public String getStats() { + return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replicated edits: " + + this.totalReplayedEdits; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 3b56e74..7a64caf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -103,6 +103,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String splitLogZNode; // znode containing the state of the load balancer public String balancerZNode; + // znode containing the state of recovering regions + public String recoveringRegionsZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -166,6 +168,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); + ZKUtil.createAndFailSilent(this, recoveringRegionsZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -215,6 +218,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); balancerZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.balancer", "balancer")); + recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.recovering.regions", "recovering-regions")); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 4f55a3d..a3d0c9f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -499,4 +502,9 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer // TODO Auto-generated method stub return null; } + + @Override + public Set getRecoveringRegions() { + return Collections.synchronizedSet(new HashSet()); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 15a2765..f01ef91 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -18,15 +18,24 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.SplitLogCounters.*; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,7 +51,16 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -59,6 +77,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -84,9 +103,14 @@ public class TestDistributedLogSplitting { HBaseTestingUtility TEST_UTIL; private void startCluster(int num_rs) throws Exception{ + conf = HBaseConfiguration.create(); + startCluster(num_rs, conf); + } + + private void startCluster(int num_rs, Configuration inConf) throws Exception { SplitLogCounters.resetCounters(); LOG.info("Starting cluster"); - conf = HBaseConfiguration.create(); + this.conf = inConf; conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); @@ -112,7 +136,10 @@ public class TestDistributedLogSplitting { @Test (timeout=300000) public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); - startCluster(NUM_RS); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, false); + startCluster(NUM_RS, curConf); + final int NUM_LOG_LINES = 1000; final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; // turn off load balancing to prevent regions from moving around otherwise @@ -145,8 +172,7 @@ public class TestDistributedLogSplitting { it.remove(); } } - makeHLog(hrs.getWAL(), regions, "table", - NUM_LOG_LINES, 100); + makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); slm.splitLogDistributed(logDir); @@ -166,6 +192,298 @@ public class TestDistributedLogSplitting { assertEquals(NUM_LOG_LINES, count); } + @Test(timeout = 300000) + public void testRecoveredEditsReplayWithNonMetaRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayWithNonMetaRSDown"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isRootRegion() || region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + ht.close(); + } + + @Test(timeout = 300000) + public void testRecoveredEditsReplayWithRootRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayWithRootRSDown"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingRoot = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isRootRegion()) { + isCarryingRoot = true; + break; + } + } + if (!isCarryingRoot) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); + + // abort RS + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + ht.close(); + } + + @Test(timeout = 300000) + public void testRecoveredEditsReplayWithMetaRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayWithMetaRSDown"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (!isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); + + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + ht.close(); + } + + private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw, + final int numRegions, final int numofLines) throws Exception { + final MiniHBaseCluster tmpCluster = this.cluster; + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 2)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + + assertEquals(numofLines, TEST_UTIL.countRows(ht)); + } + + @Test(timeout = 300000) + public void testRecoveredEditsReplayTwoSequentialRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayTwoSequentialRSDown"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, true); + startCluster(NUM_RS, curConf); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs1 = rsts.get(0).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs1); + + makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); + + // abort RS1 + LOG.info("Aborting region server: " + hrs1.getServerName()); + hrs1.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 2)); + } + }); + + // abort second region server + rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs2 = rsts.get(0).getRegionServer(); + LOG.info("Aborting one more region server: " + hrs2.getServerName()); + hrs2.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 2)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + ht.close(); + } + + @Test(timeout = 300000) + public void testMarkRegionsRecoveringInZK() throws Exception { + LOG.info("testMarkRegionsRecoveringInZK"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setBoolean(HConstants.REPLAY_WAL_EDITS_KEY, true); + startCluster(NUM_RS, curConf); + master.balanceSwitch(false); + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = master.getZooKeeperWatcher(); + HTable ht = installTable(zkw, "table", "family", 40); + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + + HRegionServer hrs = rsts.get(0).getRegionServer(); + List regions = ProtobufUtil.getOnlineRegions(hrs); + HRegionInfo region = regions.get(0); + Set regionSet = new HashSet(); + regionSet.add(region); + slm.markRegionsRecoveringInZK(rsts.get(0).getRegionServer().getServerName(), regionSet); + slm.markRegionsRecoveringInZK(rsts.get(1).getRegionServer().getServerName(), regionSet); + + List recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, + ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); + + assertEquals(recoveringRegions.size(), 2); + ht.close(); + } + /** * The original intention of this test was to force an abort of a region * server and to make sure that the failure path in the region servers is @@ -191,8 +509,9 @@ public class TestDistributedLogSplitting { installTable(new ZooKeeperWatcher(conf, "table-creation", null), "table", "family", 40); - makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", - NUM_LOG_LINES, 100); + + makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES, + 100); new Thread() { public void run() { @@ -394,9 +713,8 @@ public class TestDistributedLogSplitting { } } - public void makeHLog(HLog log, - List hris, String tname, - int num_edits, int edit_size) throws IOException { + public void makeHLog(HLog log, List hris, String tname, String fname, int num_edits, + int edit_size) throws IOException { // remove root and meta region hris.remove(HRegionInfo.ROOT_REGIONINFO); @@ -405,29 +723,33 @@ public class TestDistributedLogSplitting { HTableDescriptor htd = new HTableDescriptor(tname); byte[] value = new byte[edit_size]; for (int i = 0; i < edit_size; i++) { - value[i] = (byte)('a' + (i % 26)); + value[i] = (byte) ('a' + (i % 26)); } int n = hris.size(); int[] counts = new int[n]; - int j = 0; if (n > 0) { for (int i = 0; i < num_edits; i += 1) { WALEdit e = new WALEdit(); - byte [] row = Bytes.toBytes("r" + Integer.toString(i)); - byte [] family = Bytes.toBytes("f"); - byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i)); - e.add(new KeyValue(row, family, qualifier, - System.currentTimeMillis(), value)); - j++; - log.append(hris.get(j % n), table, e, System.currentTimeMillis(), htd); - counts[j % n] += 1; + HRegionInfo curRegionInfo = hris.get(i % n); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); + row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because + // HBaseTestingUtility.createMultiRegions use 5 bytes + // key + byte[] family = Bytes.toBytes(fname); + byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); + e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); + log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd); + counts[i % n] += 1; } } log.sync(); log.close(); for (int i = 0; i < n; i++) { - LOG.info("region " + hris.get(i).getRegionNameAsString() + - " has " + counts[i] + " edits"); + LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); } return; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java index 3eb278d..152ea04 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -43,15 +43,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TestMasterFailover; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.LargeTests; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -102,20 +99,9 @@ public class TestRSKilledWhenMasterInitializing { super(conf); } - @Override - protected void splitLogAfterStartup(MasterFileSystem mfs) { - super.splitLogAfterStartup(mfs); - logSplit = true; - // If "TestingMaster.sleep" is set, sleep after log split. - if (getConfiguration().getBoolean("TestingMaster.sleep", false)) { - int duration = getConfiguration().getInt( - "TestingMaster.sleep.duration", 0); - Threads.sleep(duration); - } - } - - - public boolean isLogSplitAfterStartup() { + public boolean isLogSplitComplete() { + List result = super.getFailedServersFromLogFolders(getMasterFileSystem()); + logSplit = (result == null || result.isEmpty()); return logSplit; } } @@ -249,10 +235,14 @@ public class TestRSKilledWhenMasterInitializing { private TestingMaster startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster) throws IOException, InterruptedException { TestingMaster master = (TestingMaster) cluster.startMaster().getMaster(); - while (!master.isLogSplitAfterStartup()) { + while (!master.isInitialized()) { + Thread.sleep(100); + } + ServerManager serverManager = cluster.getMaster().getServerManager(); + while (serverManager.areDeadServersInProgress()) { Thread.sleep(100); } - LOG.debug("splitted:" + master.isLogSplitAfterStartup() + ",initialized:" + LOG.debug("splitted:" + master.isLogSplitComplete() + ",initialized:" + master.isInitialized()); return master; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 0db1977..72c8076 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; @@ -172,4 +175,9 @@ public class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return null; } + + @Override + public Set getRecoveringRegions() { + return Collections.synchronizedSet(new HashSet()); + } }