Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (revision 1452349) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (working copy) @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,9 +29,13 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -55,6 +60,9 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler { private static final Log LOG = LogFactory.getLog(DisabledTableSnapshotHandler.class); private final TimeoutExceptionInjector timeoutInjector; + private ExecutorService service; + private MasterServices masterServices; + private AtomicInteger count = new AtomicInteger(); /** * @param snapshot descriptor of the snapshot to take @@ -64,21 +72,54 @@ public DisabledTableSnapshotHandler(SnapshotDescription snapshot, final MasterServices masterServices) throws IOException { super(snapshot, masterServices); + this.masterServices = masterServices; + service = masterServices.getExecutorService(); // setup the timer timeoutInjector = TakeSnapshotUtils.getMasterTimerAndBindToMonitor(snapshot, conf, monitor); } + private class RegionInfoWriterHandler extends EventHandler { + private HRegionInfo regionInfo; + + RegionInfoWriterHandler(Server server, EventType eventType, HRegionInfo regionInfo) { + super(server, eventType); + this.regionInfo = regionInfo; + } + + @Override + public void process() throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + + // 2.1 copy the regionInfo files to the snapshot + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, + snapshotDir, regionInfo); + + // check for error for each region + monitor.rethrowException(); + + // 2.2 for each region, copy over its recovered.edits directory + Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); + Path snapshotRegionDir = regionFs.getRegionDir(); + new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call(); + monitor.rethrowException(); + + // 2.3 reference all the files in the region + new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call(); + monitor.rethrowException(); + + count.decrementAndGet(); + } + } + // TODO consider parallelizing these operations since they are independent. Right now its just // easier to keep them serial though @Override - public void snapshotRegions(List> regionsAndLocations) throws IOException, - KeeperException { + public void snapshotRegions(List> regionsAndLocations) + throws IOException, KeeperException { try { timeoutInjector.start(); - Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); - // 1. get all the regions hosting this table. // extract each pair to separate lists @@ -88,27 +129,14 @@ regions.add(p.getFirst()); serverNames.add(p.getSecond().toString()); } - + count.set(regions.size()); + // 2. for each region, write all the info to disk LOG.info("Starting to write region info and WALs for regions for offline snapshot:" + ClientSnapshotDescriptionUtils.toString(snapshot)); for (HRegionInfo regionInfo : regions) { - // 2.1 copy the regionInfo files to the snapshot - HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, - snapshotDir, regionInfo); - - // check for error for each region - monitor.rethrowException(); - - // 2.2 for each region, copy over its recovered.edits directory - Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); - Path snapshotRegionDir = regionFs.getRegionDir(); - new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call(); - monitor.rethrowException(); - - // 2.3 reference all the files in the region - new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call(); - monitor.rethrowException(); + service.submit(new RegionInfoWriterHandler( + masterServices, EventType.C_M_SNAPSHOT_TABLE, regionInfo)); } // 3. write the table info to disk @@ -118,6 +146,10 @@ FSUtils.getRootDir(conf)); tableInfoCopyTask.call(); monitor.rethrowException(); + + while (count.get() > 0) { + Thread.sleep(30); + } } catch (Exception e) { // make sure we capture the exception to propagate back to the client later String reason = "Failed snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot)