Index: D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java =================================================================== --- D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 70991) +++ D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 71337) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -185,60 +187,74 @@ LOG.debug("No log files to split, proceeding..."); return; } + List serverNames = new ArrayList(); for (FileStatus status : logFolders) { String serverName = status.getPath().getName(); if (onlineServers.get(serverName) == null) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); - splitLog(serverName); + serverNames.add(serverName); } else { LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } + splitLog(serverNames); } - public void splitLog(final String serverName) { + public void splitLog(final String serverName){ + List serverNames = new ArrayList(); + serverNames.add(serverName); + splitLog(serverNames); + } + + public void splitLog(final List serverNames) { long splitTime = 0, splitLogSize = 0; - Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); + List logDirs = new ArrayList(); + for(String serverName: serverNames){ + Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); + logDirs.add(logDir); + } + if (distributedLogSplitting) { - splitTime = EnvironmentEdgeManager.currentTimeMillis(); + splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { - try { - splitLogSize = splitLogManager.splitLogDistributed(logDir); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying distributed splitting for " + - serverName + "because of:", e); - splitLogManager.splitLogDistributed(logDir); + try { + splitLogSize = splitLogManager.splitLogDistributed(logDirs); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying distributed splitting for " + + serverNames + "because of:", e); + splitLogManager.splitLogDistributed(logDirs); + } + } catch (IOException e) { + LOG.error("Failed distributed splitting " + serverNames, e); } - } catch (IOException e) { - LOG.error("Failed distributed splitting " + serverName, e); - } - splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; + splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { - // splitLogLock ensures that dead region servers' logs are processed - // one at a time - this.splitLogLock.lock(); - - try { - HLogSplitter splitter = HLogSplitter.createLogSplitter( - conf, rootdir, logDir, oldLogDir, this.fs); - try { - splitter.splitLog(); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying splitting because of:", e); - // An HLogSplitter instance can only be used once. Get new instance. - splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, - oldLogDir, this.fs); - splitter.splitLog(); + for(Path logDir: logDirs){ + // splitLogLock ensures that dead region servers' logs are processed + // one at a time + this.splitLogLock.lock(); + try { + HLogSplitter splitter = HLogSplitter.createLogSplitter( + conf, rootdir, logDir, oldLogDir, this.fs); + try { + splitter.splitLog(); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying splitting because of:", e); + //An HLogSplitter instance can only be used once. Get new instance. + splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, + oldLogDir, this.fs); + splitter.splitLog(); + } + splitTime = splitter.getTime(); + splitLogSize = splitter.getSize(); + } catch (IOException e) { + LOG.error("Failed splitting " + logDir.toString(), e); + } finally { + this.splitLogLock.unlock(); + } } - splitTime = splitter.getTime(); - splitLogSize = splitter.getSize(); - } catch (IOException e) { - LOG.error("Failed splitting " + logDir.toString(), e); - } finally { - this.splitLogLock.unlock(); - } } if (this.metrics != null) { Index: D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 70991) +++ D:/workspace/hbase-0.90.2-trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 71337) @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -165,36 +167,63 @@ } } + private FileStatus[] getFileList(List logDirs) throws IOException{ + List fileStatus = new ArrayList(); + for(Path hLogDir: logDirs){ + this.fs = hLogDir.getFileSystem(conf); + if (!fs.exists(hLogDir)) { + LOG.warn(hLogDir + " doesn't exist. Nothing to do!"); + continue; + } + FileStatus[] logfiles = fs.listStatus(hLogDir); // TODO filter filenames? + if (logfiles == null || logfiles.length == 0) { + LOG.info(hLogDir + " is empty dir, no logs to split"); + }else{ + for(FileStatus status: logfiles) + fileStatus.add(status); + } + } + if(fileStatus.isEmpty()) + return null; + FileStatus[] a = new FileStatus[fileStatus.size()]; + return fileStatus.toArray(a); + } + /** + * @param logDir + * one region sever hlog dir path in .logs + * @throws IOException + * if there was an error while splitting any log file + * @return cumulative size of the logfiles split + */ + public long splitLogDistributed(final Path logDir) throws IOException { + List logDirs = new ArrayList(); + logDirs.add(logDir); + return splitLogDistributed(logDirs); + } + + /** * The caller will block until all the log files of the given region server * have been processed - successfully split or an error is encountered - by an * available worker region server. This method must only be called after the * region servers have been brought online. * - * @param serverName - * region server name + * @param logDirs + * list of all the region server hlog dirs which contained in .logs * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split */ - public long splitLogDistributed(final Path logDir) throws IOException { - this.fs = logDir.getFileSystem(conf); - if (!fs.exists(logDir)) { - LOG.warn(logDir + " doesn't exist. Nothing to do!"); - return 0; - } - - LOG.debug("Doing distributed log split in " + logDir); + public long splitLogDistributed(final List logDirs) throws IOException { + LOG.debug("Doing distributed log split in " + logDirs); LOG.debug("Checking directory contents..."); - FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames? - if (logfiles == null || logfiles.length == 0) { - LOG.info(logDir + " is empty dir, no logs to split"); - return 0; - } + FileStatus[] logfiles = getFileList(logDirs); + if(logfiles == null) + return 0; LOG.debug("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); - LOG.info("started splitting logs in " + logDir); + LOG.info("started splitting logs in " + logDirs); long t = EnvironmentEdgeManager.currentTimeMillis(); long totalSize = 0; TaskBatch batch = new TaskBatch(); @@ -214,25 +243,27 @@ if (batch.done != batch.installed) { stopTrackingTasks(batch); tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDir + + LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); throw new IOException("error or interrupt while splitting logs in " - + logDir + " Task = " + batch); + + logDirs + " Task = " + batch); } - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + - " were being split"); - throw new OrphanHLogAfterSplitException(); - } - tot_mgr_log_split_batch_success.incrementAndGet(); + for(Path logDir: logDirs){ + if (anyNewLogFiles(logDir, logfiles)) { + tot_mgr_new_unexpected_hlogs.incrementAndGet(); + LOG.warn("new hlogs were produced while logs in " + logDir + + " were being split"); + throw new OrphanHLogAfterSplitException(); + } + tot_mgr_log_split_batch_success.incrementAndGet(); - LOG.debug("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + LOG.debug("Cleaning up log directory..."); + if (!fs.delete(logDir, true)) { + throw new IOException("Unable to delete src dir: " + logDir); + } } LOG.info("finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDir + " in " + + " bytes in " + batch.installed + " log files in .logs" + " in " + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"); return totalSize; }