Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (revision 1452558) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (working copy) @@ -21,6 +21,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.CopyRecoveredEditsTask; import org.apache.hadoop.hbase.snapshot.ReferenceRegionHFilesTask; @@ -43,6 +52,7 @@ import org.apache.hadoop.hbase.snapshot.TakeSnapshotUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; /** @@ -72,13 +82,11 @@ // TODO consider parallelizing these operations since they are independent. Right now its just // easier to keep them serial though @Override - public void snapshotRegions(List> regionsAndLocations) throws IOException, - KeeperException { + public void snapshotRegions(List> regionsAndLocations) + throws IOException, KeeperException { try { timeoutInjector.start(); - Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); - // 1. get all the regions hosting this table. // extract each pair to separate lists @@ -88,27 +96,45 @@ regions.add(p.getFirst()); serverNames.add(p.getSecond().toString()); } - + // 2. for each region, write all the info to disk LOG.info("Starting to write region info and WALs for regions for offline snapshot:" + ClientSnapshotDescriptionUtils.toString(snapshot)); - for (HRegionInfo regionInfo : regions) { - // 2.1 copy the regionInfo files to the snapshot - HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, - snapshotDir, regionInfo); + ThreadPoolExecutor pool = Threads.getBoundedCachedThreadPool(6, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; - // check for error for each region - monitor.rethrowException(); + public Thread newThread(Runnable r) { + return new Thread(r, "DisabledTableSnapshotThread-" + count++); + } + }); + CompletionService completionService = + new ExecutorCompletionService(pool); + for (final HRegionInfo regionInfo : regions) { + completionService.submit(new Callable() { + public Void call() throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); - // 2.2 for each region, copy over its recovered.edits directory - Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); - Path snapshotRegionDir = regionFs.getRegionDir(); - new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call(); - monitor.rethrowException(); + // 2.1 copy the regionInfo files to the snapshot + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, + snapshotDir, regionInfo); - // 2.3 reference all the files in the region - new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call(); - monitor.rethrowException(); + // check for error for each region + monitor.rethrowException(); + + // 2.2 for each region, copy over its recovered.edits directory + Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); + Path snapshotRegionDir = regionFs.getRegionDir(); + new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call(); + monitor.rethrowException(); + + // 2.3 reference all the files in the region + new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir) + .call(); + monitor.rethrowException(); + return null; + } + }); } // 3. write the table info to disk @@ -118,6 +144,15 @@ FSUtils.getRootDir(conf)); tableInfoCopyTask.call(); monitor.rethrowException(); + + for (int i = 0; i < regions.size(); i++) { + try { + Future future = completionService.take(); + future.get(); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } } catch (Exception e) { // make sure we capture the exception to propagate back to the client later String reason = "Failed snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) Index: hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/CopyRecoveredEditsTask.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/CopyRecoveredEditsTask.java (revision 1452558) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/CopyRecoveredEditsTask.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.snapshot; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.NavigableSet; import org.apache.commons.logging.Log; @@ -72,6 +74,7 @@ // copy over each file. // this is really inefficient (could be trivially parallelized), but is // really simple to reason about. + List sources = new ArrayList(); for (Path source : files) { // check to see if the file is zero length, in which case we can skip it FileStatus stat = fs.getFileStatus(source); @@ -79,12 +82,13 @@ // its not zero length, so copy over the file Path out = new Path(outputDir, source.getName()); - LOG.debug("Copying " + source + " to " + out); - FileUtil.copy(fs, source, fs, out, true, fs.getConf()); + LOG.debug("Going to copy " + source + " to " + out); + sources.add(source); + } + FileUtil.copy(fs, (Path[]) sources.toArray(), fs, outputDir, true, true, fs.getConf()); - // check for errors to the running operation after each file - this.rethrowException(); - } + // check for errors to the running operation after each file + this.rethrowException(); return null; } } \ No newline at end of file