diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java index 3b2de3e..d7c7956 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.zookeeper.KeeperException; @@ -88,8 +92,17 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler { + ClientSnapshotDescriptionUtils.toString(snapshot); LOG.info(msg); status.setStatus(msg); - for (HRegionInfo regionInfo: regions) { - snapshotDisabledRegion(regionInfo); + + ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot"); + try { + ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final HRegionInfo regionInfo) throws IOException { + snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo); + } + }); + } finally { + exec.shutdown(); } } catch (Exception e) { // make sure we capture the exception to propagate back to the client later diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java index c3f6f75..d5d9993 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java @@ -129,7 +129,7 @@ public final class MasterSnapshotVerifier { /** * Check that the table descriptor for the snapshot is a valid table descriptor - * @param snapshotDir snapshot directory to check + * @param manifest snapshot manifest to inspect */ private void verifyTableInfo(final SnapshotManifest manifest) throws IOException { HTableDescriptor htd = manifest.getTableDescriptor(); @@ -145,7 +145,7 @@ public final class MasterSnapshotVerifier { /** * Check that all the regions in the snapshot are valid, and accounted for. - * @param snapshotDir snapshot directory to check + * @param manifest snapshot manifest to inspect * @throws IOException if we can't reach hbase:meta or read the files from the FS */ private void verifyRegions(final SnapshotManifest manifest) throws IOException { @@ -167,6 +167,7 @@ public final class MasterSnapshotVerifier { LOG.error(errorMsg); } + // Verify HRegionInfo for (HRegionInfo region : regions) { SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName()); if (regionManifest == null) { @@ -177,20 +178,23 @@ public final class MasterSnapshotVerifier { continue; } - verifyRegion(fs, manifest.getSnapshotDir(), region, regionManifest); + verifyRegionInfo(region, regionManifest); } + if (!errorMsg.isEmpty()) { throw new CorruptedSnapshotException(errorMsg); } + + // Verify Snapshot HFiles + SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), fs, manifest); } /** - * Verify that the region (regioninfo, hfiles) are valid - * @param fs the FileSystem instance - * @param snapshotDir snapshot directory to check + * Verify that the regionInfo is valid * @param region the region to check + * @param manifest snapshot manifest to inspect */ - private void verifyRegion(final FileSystem fs, final Path snapshotDir, final HRegionInfo region, + private void verifyRegionInfo(final HRegionInfo region, final SnapshotRegionManifest manifest) throws IOException { HRegionInfo manifestRegionInfo = HRegionInfo.convert(manifest.getRegionInfo()); if (!region.equals(manifestRegionInfo)) { @@ -198,16 +202,5 @@ public final class MasterSnapshotVerifier { "doesn't match expected region:" + region; throw new CorruptedSnapshotException(msg, snapshot); } - - // make sure we have all the expected store files - SnapshotReferenceUtil.visitRegionStoreFiles(manifest, - new SnapshotReferenceUtil.StoreFileVisitor() { - @Override - public void storeFile(final HRegionInfo regionInfo, final String family, - final SnapshotRegionManifest.StoreFile storeFile) throws IOException { - SnapshotReferenceUtil.verifyStoreFile(services.getConfiguration(), fs, snapshotDir, - snapshot, region, family, storeFile); - } - }); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 8952f49..28e7b1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -223,6 +223,20 @@ public class StoreFileInfo { */ public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs) throws IOException { + FileStatus status = getReferencedFileStatus(fs); + if (this.reference != null) { + return computeRefFileHDFSBlockDistribution(fs, reference, status); + } else { + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + } + } + + /** + * Get the {@link FileStatus} of the file referenced by this StoreFileInfo + * @param fs The current file system to use. + * @return The {@link FileStatus} of the file referenced by this StoreFileInfo + */ + public FileStatus getReferencedFileStatus(final FileSystem fs) throws IOException { FileStatus status; if (this.reference != null) { if (this.link != null) { @@ -233,7 +247,6 @@ public class StoreFileInfo { Path referencePath = getReferredToFile(this.getPath()); status = fs.getFileStatus(referencePath); } - return computeRefFileHDFSBlockDistribution(fs, reference, status); } else { if (this.link != null) { // HFileLink @@ -241,8 +254,8 @@ public class StoreFileInfo { } else { status = this.fileStatus; } - return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); } + return status; } /** @return The {@link Path} of the file */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index 8a63862..d8d8642 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.snapshot; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -153,6 +155,15 @@ public class RestoreSnapshotHelper { * @return the set of regions touched by the restore operation */ public RestoreMetaChanges restoreHdfsRegions() throws IOException { + ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot"); + try { + return restoreHdfsRegions(exec); + } finally { + exec.shutdown(); + } + } + + private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException { LOG.debug("starting restore"); Map regionManifests = snapshotManifest.getRegionManifestsMap(); @@ -187,13 +198,13 @@ public class RestoreSnapshotHelper { // Restore regions using the snapshot data monitor.rethrowException(); status.setStatus("Restoring table regions..."); - restoreHdfsRegions(regionManifests, metaChanges.getRegionsToRestore()); + restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore()); status.setStatus("Finished restoring all table regions."); // Remove regions from the current table monitor.rethrowException(); status.setStatus("Starting to delete excess regions from table"); - removeHdfsRegions(metaChanges.getRegionsToRemove()); + removeHdfsRegions(exec, metaChanges.getRegionsToRemove()); status.setStatus("Finished deleting excess regions from table."); } @@ -210,7 +221,7 @@ public class RestoreSnapshotHelper { // Create new regions cloning from the snapshot monitor.rethrowException(); status.setStatus("Cloning regions..."); - HRegionInfo[] clonedRegions = cloneHdfsRegions(regionManifests, regionsToAdd); + HRegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd); metaChanges.setNewRegions(clonedRegions); status.setStatus("Finished cloning regions."); } @@ -345,23 +356,30 @@ public class RestoreSnapshotHelper { /** * Remove specified regions from the file-system, using the archiver. */ - private void removeHdfsRegions(final List regions) throws IOException { - if (regions != null && regions.size() > 0) { - for (HRegionInfo hri: regions) { + private void removeHdfsRegions(final ThreadPoolExecutor exec, final List regions) + throws IOException { + if (regions == null || regions.size() == 0) return; + ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final HRegionInfo hri) throws IOException { HFileArchiver.archiveRegion(conf, fs, hri); } - } + }); } /** * Restore specified regions by restoring content to the snapshot state. */ - private void restoreHdfsRegions(final Map regionManifests, + private void restoreHdfsRegions(final ThreadPoolExecutor exec, + final Map regionManifests, final List regions) throws IOException { if (regions == null || regions.size() == 0) return; - for (HRegionInfo hri: regions) { - restoreRegion(hri, regionManifests.get(hri.getEncodedName())); - } + ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final HRegionInfo hri) throws IOException { + restoreRegion(hri, regionManifests.get(hri.getEncodedName())); + } + }); } private Map> getRegionHFileReferences( @@ -465,7 +483,8 @@ public class RestoreSnapshotHelper { * Clone specified regions. For each region create a new region * and create a HFileLink for each hfile. */ - private HRegionInfo[] cloneHdfsRegions(final Map regionManifests, + private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec, + final Map regionManifests, final List regions) throws IOException { if (regions == null || regions.size() == 0) return null; @@ -490,7 +509,7 @@ public class RestoreSnapshotHelper { } // create the regions on disk - ModifyRegionUtils.createRegions(conf, rootDir, tableDir, + ModifyRegionUtils.createRegions(exec, conf, rootDir, tableDir, tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() { @Override public void fillRegion(final HRegion region) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java index df3ddac..749112a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java @@ -25,6 +25,8 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -99,14 +101,14 @@ public final class SnapshotInfo extends Configured implements Tool { } } - private int hfileArchiveCount = 0; - private int hfilesMissing = 0; - private int hfilesCount = 0; - private int logsMissing = 0; - private int logsCount = 0; - private long hfileArchiveSize = 0; - private long hfileSize = 0; - private long logSize = 0; + private AtomicInteger hfileArchiveCount = new AtomicInteger(); + private AtomicInteger hfilesMissing = new AtomicInteger(); + private AtomicInteger hfilesCount = new AtomicInteger(); + private AtomicInteger logsMissing = new AtomicInteger(); + private AtomicInteger logsCount = new AtomicInteger(); + private AtomicLong hfileArchiveSize = new AtomicLong(); + private AtomicLong hfileSize = new AtomicLong(); + private AtomicLong logSize = new AtomicLong(); private final SnapshotDescription snapshot; private final TableName snapshotTable; @@ -128,57 +130,57 @@ public final class SnapshotInfo extends Configured implements Tool { /** @return true if the snapshot is corrupted */ public boolean isSnapshotCorrupted() { - return hfilesMissing > 0 || logsMissing > 0; + return hfilesMissing.get() > 0 || logsMissing.get() > 0; } /** @return the number of available store files */ public int getStoreFilesCount() { - return hfilesCount + hfileArchiveCount; + return hfilesCount.get() + hfileArchiveCount.get(); } /** @return the number of available store files in the archive */ public int getArchivedStoreFilesCount() { - return hfileArchiveCount; + return hfileArchiveCount.get(); } /** @return the number of available log files */ public int getLogsCount() { - return logsCount; + return logsCount.get(); } /** @return the number of missing store files */ public int getMissingStoreFilesCount() { - return hfilesMissing; + return hfilesMissing.get(); } /** @return the number of missing log files */ public int getMissingLogsCount() { - return logsMissing; + return logsMissing.get(); } /** @return the total size of the store files referenced by the snapshot */ public long getStoreFilesSize() { - return hfileSize + hfileArchiveSize; + return hfileSize.get() + hfileArchiveSize.get(); } /** @return the total size of the store files shared */ public long getSharedStoreFilesSize() { - return hfileSize; + return hfileSize.get(); } /** @return the total size of the store files in the archive */ public long getArchivedStoreFileSize() { - return hfileArchiveSize; + return hfileArchiveSize.get(); } /** @return the percentage of the shared store files */ public float getSharedStoreFilePercentage() { - return ((float)hfileSize / (hfileSize + hfileArchiveSize)) * 100; + return ((float)hfileSize.get() / (hfileSize.get() + hfileArchiveSize.get())) * 100; } /** @return the total log size */ public long getLogsSize() { - return logSize; + return logSize.get(); } /** @@ -197,15 +199,15 @@ public final class SnapshotInfo extends Configured implements Tool { try { if ((inArchive = fs.exists(link.getArchivePath()))) { size = fs.getFileStatus(link.getArchivePath()).getLen(); - hfileArchiveSize += size; - hfileArchiveCount++; + hfileArchiveSize.addAndGet(size); + hfileArchiveCount.incrementAndGet(); } else { size = link.getFileStatus(fs).getLen(); - hfileSize += size; - hfilesCount++; + hfileSize.addAndGet(size); + hfilesCount.incrementAndGet(); } } catch (FileNotFoundException e) { - hfilesMissing++; + hfilesMissing.incrementAndGet(); } return new FileInfo(inArchive, size); } @@ -221,10 +223,10 @@ public final class SnapshotInfo extends Configured implements Tool { long size = -1; try { size = logLink.getFileStatus(fs).getLen(); - logSize += size; - logsCount++; + logSize.addAndGet(size); + logsCount.incrementAndGet(); } catch (FileNotFoundException e) { - logsMissing++; + logsMissing.incrementAndGet(); } return new FileInfo(false, size); } @@ -368,8 +370,8 @@ public final class SnapshotInfo extends Configured implements Tool { final SnapshotDescription snapshotDesc = snapshotManifest.getSnapshotDescription(); final String table = snapshotDesc.getTable(); final SnapshotStats stats = new SnapshotStats(this.getConf(), this.fs, snapshotDesc); - SnapshotReferenceUtil.visitReferencedFiles(getConf(), fs, - snapshotManifest.getSnapshotDir(), snapshotDesc, new SnapshotReferenceUtil.SnapshotVisitor() { + SnapshotReferenceUtil.concurrentVisitReferencedFiles(getConf(), fs, snapshotManifest, + new SnapshotReferenceUtil.SnapshotVisitor() { @Override public void storeFile(final HRegionInfo regionInfo, final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException { @@ -448,8 +450,9 @@ public final class SnapshotInfo extends Configured implements Tool { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = FileSystem.get(rootDir.toUri(), conf); Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot); final SnapshotStats stats = new SnapshotStats(conf, fs, snapshot); - SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshot, + SnapshotReferenceUtil.concurrentVisitReferencedFiles(conf, fs, manifest, new SnapshotReferenceUtil.SnapshotVisitor() { @Override public void storeFile(final HRegionInfo regionInfo, final String family, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index 621b835..47c6ebf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -442,8 +442,8 @@ public class SnapshotManifest { return createExecutor(conf, name); } - static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { - int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 4); + public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { + int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, Threads.getNamedThreadFactory(name)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java index 2f446a5..585c454 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java @@ -103,14 +103,15 @@ public class SnapshotManifestV2 { } public void storeFile(final SnapshotRegionManifest.Builder region, - final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile) { + final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile) + throws IOException { SnapshotRegionManifest.StoreFile.Builder sfManifest = SnapshotRegionManifest.StoreFile.newBuilder(); sfManifest.setName(storeFile.getPath().getName()); if (storeFile.isReference()) { sfManifest.setReference(storeFile.getReference().convert()); } - sfManifest.setFileSize(storeFile.getFileStatus().getLen()); + sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen()); family.addStoreFiles(sfManifest.build()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java index f9b0638..c8d0e5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.snapshot; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.util.HashSet; @@ -32,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; @@ -107,7 +109,7 @@ public final class SnapshotReferenceUtil { visitLogFiles(fs, snapshotDir, visitor); } - /** + /**© * Iterate over the snapshot store files * * @param conf The current {@link Configuration} instance. @@ -117,7 +119,7 @@ public final class SnapshotReferenceUtil { * @param visitor callback object to get the store files * @throws IOException if an error occurred while scanning the directory */ - public static void visitTableStoreFiles(final Configuration conf, final FileSystem fs, + static void visitTableStoreFiles(final Configuration conf, final FileSystem fs, final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor) throws IOException { SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc); @@ -139,7 +141,7 @@ public final class SnapshotReferenceUtil { * @param visitor callback object to get the store files * @throws IOException if an error occurred while scanning the directory */ - public static void visitRegionStoreFiles(final SnapshotRegionManifest manifest, + static void visitRegionStoreFiles(final SnapshotRegionManifest manifest, final StoreFileVisitor visitor) throws IOException { HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo()); for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) { @@ -192,6 +194,19 @@ public final class SnapshotReferenceUtil { final SnapshotManifest manifest) throws IOException { final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription(); final Path snapshotDir = manifest.getSnapshotDir(); + concurrentVisitReferencedFiles(conf, fs, manifest, new StoreFileVisitor() { + @Override + public void storeFile(final HRegionInfo regionInfo, final String family, + final SnapshotRegionManifest.StoreFile storeFile) throws IOException { + verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile); + } + }); + } + + public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs, + final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException { + final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription(); + final Path snapshotDir = manifest.getSnapshotDir(); List regionManifests = manifest.getRegionManifests(); if (regionManifests == null || regionManifests.size() == 0) { @@ -207,13 +222,7 @@ public final class SnapshotReferenceUtil { completionService.submit(new Callable() { @Override public Void call() throws IOException { - visitRegionStoreFiles(regionManifest, new StoreFileVisitor() { - @Override - public void storeFile(final HRegionInfo regionInfo, final String family, - final SnapshotRegionManifest.StoreFile storeFile) throws IOException { - verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile); - } - }); + visitRegionStoreFiles(regionManifest, visitor); return null; } }); @@ -251,19 +260,28 @@ public final class SnapshotReferenceUtil { * @throws CorruptedSnapshotException if the snapshot is corrupted * @throws IOException if an error occurred while scanning the directory */ - public static void verifyStoreFile(final Configuration conf, final FileSystem fs, + private static void verifyStoreFile(final Configuration conf, final FileSystem fs, final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo, final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException { + TableName table = TableName.valueOf(snapshot.getTable()); String fileName = storeFile.getName(); Path refPath = null; if (StoreFileInfo.isReference(fileName)) { // If is a reference file check if the parent file is present in the snapshot - Path snapshotHFilePath = new Path(new Path( - new Path(snapshotDir, regionInfo.getEncodedName()), family), fileName); - refPath = StoreFileInfo.getReferredToFile(snapshotHFilePath); - if (!fs.exists(refPath)) { - throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName, snapshot); + refPath = new Path(new Path(regionInfo.getEncodedName(), family), fileName); + refPath = StoreFileInfo.getReferredToFile(refPath); + String refRegion = refPath.getParent().getParent().getName(); + refPath = HFileLink.createPath(table, refRegion, family, refPath.getName()); + if (!new HFileLink(conf, refPath).exists(fs)) { + throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName + + " path=" + refPath, snapshot); + } + + if (storeFile.hasReference()) { + // We don't really need to look for the file on-disk + // we already have the Reference information embedded here. + return; } } @@ -274,15 +292,25 @@ public final class SnapshotReferenceUtil { linkPath = new Path(family, fileName); } else { linkPath = new Path(family, HFileLink.createHFileLinkName( - TableName.valueOf(snapshot.getTable()), regionInfo.getEncodedName(), fileName)); + table, regionInfo.getEncodedName(), fileName)); } // check if the linked file exists (in the archive, or in the table dir) HFileLink link = new HFileLink(conf, linkPath); - if (!link.exists(fs)) { - throw new CorruptedSnapshotException("Can't find hfile: " + fileName - + " in the real (" + link.getOriginPath() + ") or archive (" + link.getArchivePath() - + ") directory for the primary table.", snapshot); + try { + FileStatus fstat = link.getFileStatus(fs); + if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) { + String msg = "hfile: " + fileName + " size does not match with the expected one. " + + " found=" + fstat.getLen() + " expected=" + storeFile.getFileSize(); + LOG.error(msg); + throw new CorruptedSnapshotException(msg, snapshot); + } + } catch (FileNotFoundException e) { + String msg = "Can't find hfile: " + fileName + " in the real (" + + link.getOriginPath() + ") or archive (" + link.getArchivePath() + + ") directory for the primary table."; + LOG.error(msg); + throw new CorruptedSnapshotException(msg, snapshot); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index f0c5526..0ff7515 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -56,6 +57,10 @@ public abstract class ModifyRegionUtils { void fillRegion(final HRegion region) throws IOException; } + public interface RegionEditTask { + void editRegion(final HRegionInfo region) throws IOException; + } + /** * Create new set of regions on the specified file-system. * NOTE: that you should add the regions to hbase:meta after this operation. @@ -107,10 +112,36 @@ public abstract class ModifyRegionUtils { final RegionFillTask task) throws IOException { if (newRegions == null) return null; int regionNumber = newRegions.length; - ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf, + ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf, "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber); - CompletionService completionService = new ExecutorCompletionService( - regionOpenAndInitThreadPool); + try { + return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task); + } finally { + exec.shutdownNow(); + } + } + + /** + * Create new set of regions on the specified file-system. + * NOTE: that you should add the regions to hbase:meta after this operation. + * + * @param exec Thread Pool Executor + * @param conf {@link Configuration} + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param hTableDescriptor description of the table + * @param newRegions {@link HRegionInfo} that describes the regions to create + * @param task {@link RegionFillTask} custom code to populate region after creation + * @throws IOException + */ + public static List createRegions(final ThreadPoolExecutor exec, + final Configuration conf, final Path rootDir, final Path tableDir, + final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, + final RegionFillTask task) throws IOException { + if (newRegions == null) return null; + int regionNumber = newRegions.length; + CompletionService completionService = + new ExecutorCompletionService(exec); List regionInfos = new ArrayList(); for (final HRegionInfo newRegion : newRegions) { completionService.submit(new Callable() { @@ -132,8 +163,6 @@ public abstract class ModifyRegionUtils { throw new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { throw new IOException(e); - } finally { - regionOpenAndInitThreadPool.shutdownNow(); } return regionInfos; } @@ -167,6 +196,41 @@ public abstract class ModifyRegionUtils { return region.getRegionInfo(); } + /** + * Execute the task on the specified set of regions. + * + * @param exec Thread Pool Executor + * @param regions {@link HRegionInfo} that describes the regions to edit + * @param task {@link RegionFillTask} custom code to edit the region + * @throws IOException + */ + public static void editRegions(final ThreadPoolExecutor exec, + final Collection regions, final RegionEditTask task) throws IOException { + final ExecutorCompletionService completionService = + new ExecutorCompletionService(exec); + for (final HRegionInfo hri: regions) { + completionService.submit(new Callable() { + @Override + public Void call() throws IOException { + task.editRegion(hri); + return null; + } + }); + } + + try { + for (HRegionInfo hri: regions) { + completionService.take().get(); + } + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + IOException ex = new IOException(); + ex.initCause(e.getCause()); + throw ex; + } + } + /* * used by createRegions() to get the thread pool executor based on the * "hbase.hregion.open.and.init.threads.max" property.