From 833f08fbde3e840f442aa77d6385dc729c9625e0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 7 Dec 2017 01:06:33 +0800 Subject: [PATCH] HBASE-15482 Provide an option to skip calculating block locations for SnapshotInputFormat --- .../mapreduce/TableSnapshotInputFormatImpl.java | 53 ++++++++++++++++------ .../hbase/mapred/TestTableSnapshotInputFormat.java | 18 +++++++- .../TableSnapshotInputFormatTestBase.java | 6 +-- .../mapreduce/TestTableSnapshotInputFormat.java | 17 ++++++- 4 files changed, 73 insertions(+), 21 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index bee4926ca6..da6833e51f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -86,6 +86,18 @@ public class TableSnapshotInputFormatImpl { */ public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; + /** + * Whether to calculate the block location for splits. Default to true. + * If the computing layer runs outside of HBase cluster, the block locality does not master. + * Setting this value to false could skip the calculation and save some time. + * + * Set access modifier to "public" so that these could be accessed by test classes of + * both org.apache.hadoop.hbase.mapred + * and org.apache.hadoop.hbase.mapreduce. + */ + public static final String SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY = "hbase.TableSnapshotInputFormat.locality"; + public static final boolean DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY = true; + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ @@ -356,6 +368,9 @@ public class TableSnapshotInputFormatImpl { Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + boolean careBlockLocality = conf.getBoolean(SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY, + DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY); + List splits = new ArrayList<>(); for (HRegionInfo hri : regionManifests) { // load region descriptor @@ -365,29 +380,19 @@ public class TableSnapshotInputFormatImpl { for (int i = 0; i < sp.length - 1; i++) { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + List hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir, careBlockLocality); - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); Scan boundedScan = new Scan(scan); boundedScan.setStartRow(sp[i]); boundedScan.setStopRow(sp[i + 1]); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); } } } else { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); + hri.getStartKey(), hri.getEndKey())) { + List hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir, careBlockLocality); splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } @@ -397,6 +402,26 @@ public class TableSnapshotInputFormatImpl { } + /** + * Compute block locations for snapshot files (which will get the locations for referred hfiles) + * only when careBlockLocality is true. + */ + private static List calculateLocationsForInputSplit(Configuration conf, TableDescriptor htd, HRegionInfo hri, + Path tableDir, boolean careBlockLocality) throws IOException { + if (careBlockLocality) { // care block locality + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + // return at most top 3 hosts + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + + return hosts; + } else { // do not care block locality + return null; + } + } + /** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java index be36b6a651..6711b7b5ea 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.mapred; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY; import static org.mockito.Mockito.mock; import org.apache.hadoop.fs.Path; @@ -138,7 +140,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Test @Override public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true /* value does not matter */); } @Test @@ -165,7 +167,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setCareBlockLocalityTo) throws Exception { setupCluster(); final TableName tableName = TableName.valueOf(name.getMethodName()); try { @@ -173,6 +175,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); JobConf job = new JobConf(util.getConfiguration()); + // setCareBlockLocalityTo is ignored no matter what is specified, so as to test the case that + // SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY is not explicitly specified and the default value is taken. Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); if (numSplitsPerRegion > 1) { @@ -206,10 +210,20 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + boolean careBlockLocality = job.getBoolean(SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY, + DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY); + for (int i = 0; i < splits.length; i++) { // validate input split InputSplit split = splits[i]; Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); + if (careBlockLocality) { + // split.getLocation() could be an empty array (length is 0), even through the block locality is calculated, + // so drop the following verification. + //Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); + } else { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); + } // validate record reader OutputCollector collector = mock(OutputCollector.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 362dca1963..f68d0d633a 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -78,7 +78,7 @@ public abstract class TableSnapshotInputFormatTestBase { } protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception; + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setCareBlockLocalityTo) throws Exception; protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, @@ -90,12 +90,12 @@ public abstract class TableSnapshotInputFormatTestBase { @Test public void testWithMockedMapReduceSingleRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1, true); } @Test public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8, false); } @Test diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 890eb2fe11..0a8d1870e5 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.mapreduce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY; + import java.io.IOException; import java.util.List; @@ -210,14 +213,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setCareBlockLocalityTo) throws Exception { setupCluster(); final TableName tableName = TableName.valueOf(name.getMethodName()); try { createTableAndSnapshot( util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); - Job job = new Job(util.getConfiguration()); + Configuration conf = util.getConfiguration(); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY, setCareBlockLocalityTo); + Job job = new Job(conf); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan @@ -304,10 +309,18 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + boolean careBlockLocality = job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY_KEY, + DEFAULT_SNAPSHOT_INPUTFORMAT_CARE_BLOCK_LOCALITY); + for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + if (careBlockLocality) { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); + } else { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); + } // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); -- 2.14.1