From 6475c269206c18427f5449f25ed5041e30f11b78 Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 14 Sep 2017 17:08:16 +0800 Subject: [PATCH] HBASE-18806 VerifyRep by snapshot need not to restore snapshot for each mapper --- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 32 +++--- .../mapreduce/TableSnapshotInputFormatImpl.java | 10 +- .../mapreduce/replication/VerifyReplication.java | 24 ++++- .../replication/TestReplicationSmallTests.java | 18 ++++ .../hadoop/hbase/client/TableSnapshotScanner.java | 120 +++++++++++++++------ .../hbase/client/TestTableSnapshotScanner.java | 47 ++++++++ 6 files changed, 197 insertions(+), 54 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 4dcd048..010d5d2 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -344,22 +344,20 @@ public class TableMapReduceUtil { } /** - * Sets up the job for reading from a table snapshot. It bypasses hbase servers - * and read directly from snapshot files. - * + * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly + * from snapshot files. * @param snapshotName The name of the snapshot (of a table) to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase + * configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via + * the distributed cache (tmpjars). * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restore directory can be deleted. + * have write permissions to this directory, and this should not be a subdirectory of + * rootdir. After the job is finished, restore directory can be deleted. * @throws IOException When setting up the details fails. * @see TableSnapshotInputFormat */ @@ -368,10 +366,10 @@ public class TableMapReduceUtil { Class outputKeyClass, Class outputValueClass, Job job, boolean addDependencyJars, Path tmpRestoreDir) - throws IOException { + throws IOException { TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); - initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, false, TableSnapshotInputFormat.class); resetCacheConfig(job.getConfiguration()); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 6a09c81..fbd9c23 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -316,6 +316,10 @@ public class TableSnapshotInputFormatImpl { List splits = new ArrayList<>(); for (HRegionInfo hri : regionManifests) { // load region descriptor + // Split parent region should be excluded. + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + continue; + } if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), hri.getEndKey())) { @@ -388,9 +392,9 @@ public class TableSnapshotInputFormatImpl { * Configures the job to use TableSnapshotInputFormat to read from a snapshot. * @param conf the job to configuration * @param snapshotName the name of the snapshot to read from - * @param restoreDir a temporary directory to restore the snapshot into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restoreDir can be deleted. + * @param restoreDir a temporary directory to restore the snapshot into. Current user should have + * write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. * @throws IOException if an error occurs */ public static void setInput(Configuration conf, String snapshotName, Path restoreDir) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index acf6ff8..7a3f030 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -212,8 +214,8 @@ public class VerifyReplication extends Configured implements Tool { + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) + " peerFSAddress:" + peerFSAddress); - replicatedScanner = new TableSnapshotScanner(peerConf, - new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan); + replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf), + new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); } else { replicatedScanner = replicatedTable.getScanner(scan); } @@ -361,6 +363,17 @@ public class VerifyReplication extends Configured implements Tool { } } + private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) + throws IOException { + Configuration peerConf = + HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); + FileSystem.setDefaultUri(peerConf, peerFSAddress); + FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); + FileSystem fs = FileSystem.get(peerConf); + RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf), + new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); + } + /** * Sets up the actual job. * @@ -405,7 +418,13 @@ public class VerifyReplication extends Configured implements Tool { //Set Snapshot specific parameters if (peerSnapshotName != null) { conf.set(NAME + ".peerSnapshotName", peerSnapshotName); + + // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to + // restore snapshot. + Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); + peerSnapshotTmpDir = restoreDir.toString(); conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); + conf.set(NAME + ".peerFSAddress", peerFSAddress); conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); @@ -442,6 +461,7 @@ public class VerifyReplication extends Configured implements Tool { "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, null, job, true, snapshotTempPath); + restoreSnapshotForPeerCluster(conf, peerQuorumAddress); } else { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 192b85d..efcc6f3 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -30,6 +30,7 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -901,6 +902,17 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); } + private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) + throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); + assertNotNull(subDirectories); + assertEquals(subDirectories.length, expectedCount); + for (int i = 0; i < expectedCount; i++) { + assertTrue(subDirectories[i].isDirectory()); + } + } + @Test(timeout = 300000) public void testVerifyReplicationWithSnapshotSupport() throws Exception { // Populate the tables, at the same time it guarantees that the tables are @@ -942,6 +954,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(0, job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + checkRestoreTmpDir(conf1, temPath1, 1); + checkRestoreTmpDir(conf2, temPath2, 1); + Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); Put put = null; @@ -979,6 +994,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); assertEquals(NB_ROWS_IN_BATCH, job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + checkRestoreTmpDir(conf1, temPath1, 2); + checkRestoreTmpDir(conf2, temPath2, 2); } @Test diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java index 9244ced..20380fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.UUID; @@ -31,6 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.FSUtils; @@ -75,6 +78,7 @@ public class TableSnapshotScanner extends AbstractClientScanner { private Scan scan; private ArrayList regions; private TableDescriptor htd; + private boolean snapshotAlreadyRestored = false; private ClientSideRegionScanner currentRegionScanner = null; private int currentRegion = -1; @@ -83,14 +87,14 @@ public class TableSnapshotScanner extends AbstractClientScanner { * Creates a TableSnapshotScanner. * @param conf the configuration * @param restoreDir a temporary directory to copy the snapshot files into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * The scanner deletes the contents of the directory once the scanner is closed. + * have write permissions to this directory, and this should not be a subdirectory of + * rootDir. The scanner deletes the contents of the directory once the scanner is closed. * @param snapshotName the name of the snapshot to read from * @param scan a Scan representing scan parameters * @throws IOException in case of error */ - public TableSnapshotScanner(Configuration conf, Path restoreDir, - String snapshotName, Scan scan) throws IOException { + public TableSnapshotScanner(Configuration conf, Path restoreDir, String snapshotName, Scan scan) + throws IOException { this(conf, FSUtils.getRootDir(conf), restoreDir, snapshotName, scan); } @@ -99,45 +103,84 @@ public class TableSnapshotScanner extends AbstractClientScanner { * @param conf the configuration * @param rootDir root directory for HBase. * @param restoreDir a temporary directory to copy the snapshot files into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * The scanner deletes the contents of the directory once the scanner is closed. + * have write permissions to this directory, and this should not be a subdirectory of + * rootDir. The scanner deletes the contents of the directory once the scanner is closed. * @param snapshotName the name of the snapshot to read from + * @param snapshotAlreadyRestored true to indicate that snapshot has been restored. * @param scan a Scan representing scan parameters * @throws IOException in case of error */ - public TableSnapshotScanner(Configuration conf, Path rootDir, - Path restoreDir, String snapshotName, Scan scan) throws IOException { + public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir, + String snapshotName, Scan scan, boolean snapshotAlreadyRestored) throws IOException { this.conf = conf; this.snapshotName = snapshotName; this.rootDir = rootDir; - // restoreDir will be deleted in close(), use a unique sub directory - this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); this.scan = scan; this.fs = rootDir.getFileSystem(conf); - init(); + this.snapshotAlreadyRestored = snapshotAlreadyRestored; + + if (snapshotAlreadyRestored) { + this.restoreDir = restoreDir; + openWithoutRestoringSnapshot(); + } else { + // restoreDir will be deleted in close(), use a unique sub directory + this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); + openWithRestoringSnapshot(); + } + + initScanMetrics(scan); + } + + /** + * Creates a TableSnapshotScanner. + * @param conf the configuration + * @param rootDir root directory for HBase. + * @param restoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of + * rootDir. The scanner deletes the contents of the directory once the scanner is closed. + * @param snapshotName the name of the snapshot to read from + * @param scan a Scan representing scan parameters + * @throws IOException in case of error + */ + public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir, + String snapshotName, Scan scan) throws IOException { + this(conf, rootDir, restoreDir, snapshotName, scan, false); + } + + private boolean isValidRegion(HRegionInfo hri) { + // An offline split parent region should be excluded. + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + return false; + } + return CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), + hri.getEndKey()); } - private void init() throws IOException { + private void openWithoutRestoringSnapshot() throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotProtos.SnapshotDescription snapshotDesc = + SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + List regionManifests = manifest.getRegionManifests(); + if (regionManifests == null) { + throw new IllegalArgumentException("Snapshot seems empty, snapshotName: " + snapshotName); + } + + regions = new ArrayList<>(regionManifests.size()); + regionManifests.stream().map(r -> HRegionInfo.convert(r.getRegionInfo())) + .filter(this::isValidRegion).sorted().forEach(r -> regions.add(r)); + htd = manifest.getTableDescriptor(); + } + + private void openWithRestoringSnapshot() throws IOException { final RestoreSnapshotHelper.RestoreMetaChanges meta = - RestoreSnapshotHelper.copySnapshotForScanner( - conf, fs, rootDir, restoreDir, snapshotName); + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); final List restoredRegions = meta.getRegionsToAdd(); htd = meta.getTableDescriptor(); regions = new ArrayList<>(restoredRegions.size()); - for (HRegionInfo hri : restoredRegions) { - if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { - continue; - } - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), - hri.getEndKey())) { - regions.add(hri); - } - } - - // sort for regions according to startKey. - Collections.sort(regions); - initScanMetrics(scan); + restoredRegions.stream().filter(this::isValidRegion).sorted().forEach(r -> regions.add(r)); } @Override @@ -172,15 +215,28 @@ public class TableSnapshotScanner extends AbstractClientScanner { } } + private void cleanup() { + try { + if (fs.exists(this.restoreDir)) { + if (!fs.delete(this.restoreDir, true)) { + LOG.warn("Delete restore directory for the snapshot failed. restoreDir: " + + this.restoreDir); + } + } + } catch (IOException ex) { + LOG.warn("Could not delete restore directory for the snapshot. restoreDir: " + + this.restoreDir, ex); + } + } + @Override public void close() { if (currentRegionScanner != null) { currentRegionScanner.close(); } - try { - fs.delete(this.restoreDir, true); - } catch (IOException ex) { - LOG.warn("Could not delete restore directory for the snapshot:" + ex); + // if snapshotAlreadyRestored is true, then we should invoke cleanup() method by hand. + if (!this.snapshotAlreadyRestored) { + cleanup(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java index 535a34d..85d6b2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -187,6 +188,52 @@ public class TestTableSnapshotScanner { testScanner(UTIL, "testWithMultiRegion", 20, true); } + @Test + public void testScannerWithRestoreScanner() throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testScanner"); + String snapshotName = "testScannerWithRestoreScanner"; + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, 50); + Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan(bbb, yyy); // limit the scan + + Configuration conf = UTIL.getConfiguration(); + Path rootDir = FSUtils.getRootDir(conf); + + TableSnapshotScanner scanner0 = + new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); + verifyScanner(scanner0, bbb, yyy); + scanner0.close(); + + // restore snapshot. + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + + // scan the snapshot without restoring snapshot + TableSnapshotScanner scanner = + new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); + verifyScanner(scanner, bbb, yyy); + scanner.close(); + + // check whether the snapshot has been deleted by the close of scanner. + scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); + verifyScanner(scanner, bbb, yyy); + scanner.close(); + + // restore snapshot again. + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + + // check whether the snapshot has been deleted by the close of scanner. + scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); + verifyScanner(scanner, bbb, yyy); + scanner.close(); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, boolean shutdownCluster) throws Exception { setupCluster(); -- 2.3.2 (Apple Git-55)