diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java index edb12443de..21716f5ae4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -110,7 +110,8 @@ public class MultiTableSnapshotInputFormatImpl { for (Scan scan : entry.getValue()) { List splits = - TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); + TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf, + TableSnapshotInputFormatImpl.getRegionSplitSize(conf)); rtn.addAll(splits); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 73c5478a72..582fc6ec07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -29,12 +29,14 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.ClientSideRegionScanner; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; @@ -43,17 +45,21 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -87,6 +93,20 @@ public class TableSnapshotInputFormatImpl { */ public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; + /** + * For MapReduce jobs running multiple mappers per region, + * determines the size of a split for having a balanced input for all mappers + */ + public static final String REGION_SPLIT_SIZE = "region.split.size"; + + public static final long DEFAULT_REGION_SPLIT_SIZE = -1; + + /** + * For MapReduce jobs running multiple mappers per region, + * determines the minimum size of a split to avoid having to many splits + */ + public static final long MIN_REGION_SPLIT_SIZE = 10L * 1024 * 1024; //10MB + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ @@ -97,6 +117,7 @@ public class TableSnapshotInputFormatImpl { private String[] locations; private String scan; private String restoreDir; + private long length; // constructor for mapreduce framework / Writable public InputSplit() {} @@ -119,6 +140,12 @@ public class TableSnapshotInputFormatImpl { this.restoreDir = restoreDir.toString(); } + public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, + Scan scan, Path restoreDir, long length) { + this(htd, regionInfo, locations, scan, restoreDir); + this.length = length; + } + public HTableDescriptor getHtd() { return htd; } @@ -132,8 +159,7 @@ public class TableSnapshotInputFormatImpl { } public long getLength() { - //TODO: We can obtain the file sizes of the snapshot here. - return 0; + return length; } public String[] getLocations() { @@ -172,6 +198,7 @@ public class TableSnapshotInputFormatImpl { Bytes.writeByteArray(out, Bytes.toBytes(scan)); Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + WritableUtils.writeVLong(out, length); } @Override @@ -187,6 +214,8 @@ public class TableSnapshotInputFormatImpl { this.scan = Bytes.toString(Bytes.readByteArray(in)); this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); + + this.length = WritableUtils.readVLong(in); } } @@ -276,10 +305,25 @@ public class TableSnapshotInputFormatImpl { Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf); - + long splitSize = getRegionSplitSize(conf); int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1); - return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); + return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits, splitSize); + } + + public static long getRegionSplitSize(Configuration conf) { + long splitSize = conf.getLong(REGION_SPLIT_SIZE, DEFAULT_REGION_SPLIT_SIZE); + if (splitSize == DEFAULT_REGION_SPLIT_SIZE) { + LOG.info( REGION_SPLIT_SIZE + " option is not set"); + return DEFAULT_REGION_SPLIT_SIZE; + } + if (splitSize < MIN_REGION_SPLIT_SIZE) { + LOG.info("Given region split size [" + splitSize + "] is too small"); + LOG.info("Using the default version " + MIN_REGION_SPLIT_SIZE); + return MIN_REGION_SPLIT_SIZE; + } + System.out.println("Split size = " + splitSize); + return splitSize; } public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{ @@ -344,22 +388,47 @@ public class TableSnapshotInputFormatImpl { } public static List getSplits(Scan scan, SnapshotManifest manifest, - List regionManifests, Path restoreDir, Configuration conf) throws IOException { - return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1); + List regionManifests, Path restoreDir, Configuration conf, + long splitSize) throws IOException { + return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1, splitSize); } public static List getSplits(Scan scan, SnapshotManifest manifest, List regionManifests, Path restoreDir, - Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException { + Configuration conf, RegionSplitter.SplitAlgorithm sa, + int numSplits, long splitSize) throws IOException { // load table descriptor HTableDescriptor htd = manifest.getTableDescriptor(); Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + //compute split lengths + Path snapshotDir = manifest.getSnapshotDir(); + FileSystem fs = snapshotDir.getFileSystem(conf); + Map regionSizes = new HashMap<>(); + try { + regionSizes = getRegionSizes(conf, fs, snapshotDir); + } catch (IOException exception) { + LOG.info("Region sizes could not be computed"); + } + List splits = new ArrayList(); for (HRegionInfo hri : regionManifests) { // load region descriptor + long length; + if (regionSizes.get(hri.getEncodedName()) == null) { + length = 0; + } else { + length = regionSizes.get(hri.getEncodedName()); + } + if (splitSize != DEFAULT_REGION_SPLIT_SIZE) { + //the splitSize has priority over the number of splits per region + //a more balanced input can be obtained with a splitSize, rather than a number of splits + numSplits = (int) (length / splitSize) + 1; + length = length / numSplits; + } + if (numSplits > 1) { byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); for (int i = 0; i < sp.length - 1; i++) { @@ -386,7 +455,7 @@ public class TableSnapshotInputFormatImpl { boundedScan.withStopRow( Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); } - splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir, length)); } } } else { @@ -399,7 +468,7 @@ public class TableSnapshotInputFormatImpl { int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir, length)); } } } @@ -408,6 +477,43 @@ public class TableSnapshotInputFormatImpl { } + private static Map getRegionSizes(final Configuration conf, + final FileSystem fs, final Path snapshotDir) + throws IOException { + + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + final TableName table = TableName.valueOf(snapshotDesc.getTable()); + final Map sizes = new HashMap<>(); + + LOG.info("Start computing region lengths for snapshot " + snapshotDesc.getName()); + + SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, + new SnapshotReferenceUtil.SnapshotVisitor() { + @Override + public void storeFile(HRegionInfo regionInfo, String familyName, + SnapshotRegionManifest.StoreFile storeFile) + throws IOException { + // for storeFile.hasReference() case, copied as part of the manifest + if (!storeFile.hasReference()) { + String region = regionInfo.getEncodedName(); + String hfile = storeFile.getName(); + Path path = HFileLink.createPath(table, region, familyName, hfile); + long size; + if (storeFile.hasFileSize()) { + size = storeFile.getFileSize(); + } else { + size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); + } + size = sizes.get(region) == null ? size : size + sizes.get(region); + sizes.put(region, size); + } + } + }); + + LOG.info("Region lengths computed"); + return sizes; + } + /** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 767d102ab6..1b48d3b8a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -234,4 +234,36 @@ public abstract class TableSnapshotInputFormatTestBase { table.close(); } + protected static void createEmptyTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, int numRegions) + throws Exception { + + try { + LOG.debug("Ensuring table doesn't exist."); + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + LOG.info("creating table '" + tableName + "'"); + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + + Admin admin = util.getHBaseAdmin(); + HTable table = new HTable(util.getConfiguration(), tableName); + Path rootDir = FSUtils.getRootDir(util.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + LOG.info("create snapshot for an empty table"); + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + null, null, snapshotName, rootDir, fs, true); + + LOG.info("cause flush to create new files in the region"); + admin.flush(tableName); + table.close(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 775d29dbe1..caf084e711 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -54,6 +56,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -368,6 +371,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() > 0); TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; Scan scan = TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); @@ -460,4 +464,51 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } } + + @Test + public void testLengthIsSerialized() throws Exception { + Path path = UTIL.getRandomDir(); + TableSnapshotInputFormatImpl.InputSplit split1 = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), + new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path, 1000); + TableSnapshotInputFormatImpl.InputSplit deserialized = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), + new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path); + + ReflectionUtils.copy(new Configuration(), split1, deserialized); + Assert.assertEquals(1000, deserialized.getLength()); + } + + @Test + public void testWithMockedMapReduceWithEmptySnapshot() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceWithSplitsPerRegion"; + final TableName tableName = TableName.valueOf(snapshotName); + try { + createEmptyTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Job job = new Job(UTIL.getConfiguration()); + Scan scan = new Scan(); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + UTIL.getDataTestDirOnTestFS(snapshotName), new RegionSplitter.UniformSplit(), 5); + + List splits = (new TableSnapshotInputFormat()).getSplits(job); + Assert.assertEquals(50, splits.size()); + + for (InputSplit split : splits) { + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() == 0); + } + + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + }