Index: src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.Lock; @@ -175,7 +177,7 @@ /** * Inspect the log directory to recover any log file without * an active region server. - * @param onlineServers Map of online servers keyed by + * @param onlineServers Set of online servers keyed by * {@link ServerName} */ void splitLogAfterStartup(final Set onlineServers) { @@ -197,64 +199,78 @@ LOG.debug("No log files to split, proceeding..."); return; } + List serverNames = new ArrayList(); for (FileStatus status : logFolders) { ServerName serverName = new ServerName(status.getPath().getName()); - if (!onlineServers.contains(serverName)) { + if (onlineServers.contains(serverName)) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); - splitLog(serverName); + serverNames.add(serverName); } else { LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } - } + } + splitLog(serverNames); } - - public void splitLog(final ServerName serverName) { + + public void splitLog(final ServerName serverName){ + List serverNames = new ArrayList(); + serverNames.add(serverName); + splitLog(serverNames); + } + + public void splitLog(final List serverNames) { long splitTime = 0, splitLogSize = 0; - Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString())); + List logDirs = new ArrayList(); + for(ServerName serverName: serverNames){ + Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString())); + logDirs.add(logDir); + } + if (distributedLogSplitting) { splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { try { - splitLogSize = splitLogManager.splitLogDistributed(logDir); + splitLogSize = splitLogManager.splitLogDistributed(logDirs); } catch (OrphanHLogAfterSplitException e) { LOG.warn("Retrying distributed splitting for " + - serverName + "because of:", e); - splitLogManager.splitLogDistributed(logDir); + serverNames + "because of:", e); + splitLogManager.splitLogDistributed(logDirs); } } catch (IOException e) { - LOG.error("Failed distributed splitting " + serverName, e); + LOG.error("Failed distributed splitting " + serverNames, e); } splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { - // splitLogLock ensures that dead region servers' logs are processed - // one at a time - this.splitLogLock.lock(); - - try { - HLogSplitter splitter = HLogSplitter.createLogSplitter( + for(Path logDir: logDirs){ + // splitLogLock ensures that dead region servers' logs are processed + // one at a time + this.splitLogLock.lock(); + try { + HLogSplitter splitter = HLogSplitter.createLogSplitter( conf, rootdir, logDir, oldLogDir, this.fs); - try { - // If FS is in safe mode, just wait till out of it. - FSUtils.waitOnSafeMode(conf, - conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000)); - splitter.splitLog(); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying splitting because of:", e); - // An HLogSplitter instance can only be used once. Get new instance. - splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, + try { + // If FS is in safe mode, just wait till out of it. + FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000)); + splitter.splitLog(); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying splitting because of:", e); + //An HLogSplitter instance can only be used once. Get new instance. + splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, oldLogDir, this.fs); - splitter.splitLog(); + splitter.splitLog(); + } + splitTime = splitter.getTime(); + splitLogSize = splitter.getSize(); + } catch (IOException e) { + LOG.error("Failed splitting " + logDir.toString(), e); + } finally { + this.splitLogLock.unlock(); } - splitTime = splitter.getTime(); - splitLogSize = splitter.getSize(); - } catch (IOException e) { - LOG.error("Failed splitting " + logDir.toString(), e); - } finally { - this.splitLogLock.unlock(); } } + if (this.metrics != null) { this.metrics.addSplit(splitTime, splitLogSize); } Index: src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -167,6 +169,40 @@ } } + private FileStatus[] getFileList(List logDirs) throws IOException { + List fileStatus = new ArrayList(); + for (Path hLogDir : logDirs) { + this.fs = hLogDir.getFileSystem(conf); + if (!fs.exists(hLogDir)) { + LOG.warn(hLogDir + " doesn't exist. Nothing to do!"); + continue; + } + FileStatus[] logfiles = fs.listStatus(hLogDir); // TODO filter filenames? + if (logfiles == null || logfiles.length == 0) { + LOG.info(hLogDir + " is empty dir, no logs to split"); + } else { + for (FileStatus status : logfiles) + fileStatus.add(status); + } + } + if (fileStatus.isEmpty()) + return null; + FileStatus[] a = new FileStatus[fileStatus.size()]; + return fileStatus.toArray(a); + } + + /** + * @param logDir + * one region sever hlog dir path in .logs + * @throws IOException + * if there was an error while splitting any log file + * @return cumulative size of the logfiles split + */ + public long splitLogDistributed(final Path logDir) throws IOException { + List logDirs = new ArrayList(); + logDirs.add(logDir); + return splitLogDistributed(logDirs); + } /** * The caller will block until all the log files of the given region server * have been processed - successfully split or an error is encountered - by an @@ -179,28 +215,18 @@ * if there was an error while splitting any log file * @return cumulative size of the logfiles split */ - public long splitLogDistributed(final Path logDir) throws IOException { - this.fs = logDir.getFileSystem(conf); - if (!fs.exists(logDir)) { - LOG.warn(logDir + " doesn't exist. Nothing to do!"); - return 0; - } - + public long splitLogDistributed(final List logDirs) throws IOException { MonitoredTask status = TaskMonitor.get().createStatus( - "Doing distributed log split in " + logDir); - - status.setStatus("Checking directory contents..."); - FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames? - if (logfiles == null || logfiles.length == 0) { - LOG.info(logDir + " is empty dir, no logs to split"); + "Doing distributed log split in " + logDirs); + FileStatus[] logfiles = getFileList(logDirs); + if(logfiles == null) return 0; - } - - status.setStatus("Scheduling batch of logs to split"); + status.setStatus("Checking directory contents..."); + LOG.debug("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); - LOG.info("started splitting logs in " + logDir); + LOG.info("started splitting logs in " + logDirs); long t = EnvironmentEdgeManager.currentTimeMillis(); - long totalSize = 0; + long totalSize = 0; TaskBatch batch = new TaskBatch(); for (FileStatus lf : logfiles) { // TODO If the log file is still being written to - which is most likely @@ -218,32 +244,29 @@ if (batch.done != batch.installed) { stopTrackingTasks(batch); tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDir + + LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); throw new IOException("error or interrupt while splitting logs in " - + logDir + " Task = " + batch); + + logDirs + " Task = " + batch); } - - status.setStatus("Checking for orphaned logs in log directory..."); - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + + for(Path logDir: logDirs){ + if (anyNewLogFiles(logDir, logfiles)) { + tot_mgr_new_unexpected_hlogs.incrementAndGet(); + LOG.warn("new hlogs were produced while logs in " + logDir + " were being split"); - throw new OrphanHLogAfterSplitException(); - } - tot_mgr_log_split_batch_success.incrementAndGet(); - - status.setStatus("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + throw new OrphanHLogAfterSplitException(); + } + tot_mgr_log_split_batch_success.incrementAndGet(); + status.setStatus("Cleaning up log directory..."); + if (!fs.delete(logDir, true)) { + throw new IOException("Unable to delete src dir: " + logDir); + } } - String msg = "finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDir + " in " + + " bytes in " + batch.installed + " log files in " + logDirs + " in " + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; status.markComplete(msg); LOG.info(msg); - return totalSize; } @@ -965,4 +988,4 @@ */ public Status finish(String workerName, String taskname); } -} \ No newline at end of file +}