diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index d90062f62c..cdd2f4e6a5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -105,6 +106,9 @@ public class TableSnapshotInputFormat extends InputFormat locations, + Scan scan, Path restoreDir, long length) { + this(htd, regionInfo, locations, scan, restoreDir); + this.length = length; + } + public TableDescriptor getHtd() { return htd; } @@ -146,8 +160,7 @@ public class TableSnapshotInputFormatImpl { } public long getLength() { - //TODO: We can obtain the file sizes of the snapshot here. - return 0; + return length; } public String[] getLocations() { @@ -186,6 +199,7 @@ public class TableSnapshotInputFormatImpl { Bytes.writeByteArray(out, Bytes.toBytes(scan)); Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + WritableUtils.writeVLong(out, length); } @Override @@ -201,6 +215,7 @@ public class TableSnapshotInputFormatImpl { this.scan = Bytes.toString(Bytes.readByteArray(in)); this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); + this.length = WritableUtils.readVLong(in); } } @@ -368,12 +383,30 @@ public class TableSnapshotInputFormatImpl { boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); + //compute split lengths + Path snapshotDir = manifest.getSnapshotDir(); + FileSystem fs = snapshotDir.getFileSystem(conf); + Map regionSizes = new HashMap<>(); + try { + regionSizes = getRegionSizes(conf, fs, snapshotDir); + } catch (IOException exception) { + LOG.info("Region sizes could not be computed"); + } + List splits = new ArrayList<>(); for (HRegionInfo hri : regionManifests) { // load region descriptor + long length; + if (regionSizes.get(hri.getEncodedName()) == null) { + length = 0; + } else { + length = regionSizes.get(hri.getEncodedName()); + } + if (numSplits > 1) { byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); + length = length / numSplits; for (int i = 0; i < sp.length - 1; i++) { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])) { @@ -395,7 +428,7 @@ public class TableSnapshotInputFormatImpl { Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); } - splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir, length)); } } } else { @@ -403,7 +436,7 @@ public class TableSnapshotInputFormatImpl { hri.getStartKey(), hri.getEndKey())) { List hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir,length)); } } } @@ -411,6 +444,43 @@ public class TableSnapshotInputFormatImpl { return splits; } + private static Map getRegionSizes(final Configuration conf, + final FileSystem fs, final Path snapshotDir) + throws IOException { + + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + final TableName table = TableName.valueOf(snapshotDesc.getTable()); + final Map sizes = new HashMap<>(); + + LOG.info("Start computing region lengths for snapshot " + snapshotDesc.getName()); + + SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, + new SnapshotReferenceUtil.SnapshotVisitor() { + @Override + public void storeFile(final RegionInfo regionInfo, String familyName, + final SnapshotRegionManifest.StoreFile storeFile) + throws IOException { + // for storeFile.hasReference() case, copied as part of the manifest + if (!storeFile.hasReference()) { + String region = regionInfo.getEncodedName(); + String hfile = storeFile.getName(); + Path path = HFileLink.createPath(table, region, familyName, hfile); + long size; + if (storeFile.hasFileSize()) { + size = storeFile.getFileSize(); + } else { + size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); + } + size = sizes.get(region) == null ? size : size + sizes.get(region); + sizes.put(region, size); + } + } + }); + + LOG.info("Region lengths computed"); + return sizes; + } + /** * Compute block locations for snapshot files (which will get the locations for referred hfiles) * only when localityEnabled is true. diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 744c356410..0903a3b30f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.HFileLink; @@ -232,4 +233,36 @@ public abstract class TableSnapshotInputFormatTestBase { table.close(); } + protected static void createEmptyTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, int numRegions) + throws Exception { + + try { + LOG.debug("Ensuring table doesn't exist."); + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + LOG.info("creating table '" + tableName + "'"); + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + + Admin admin = util.getHBaseAdmin(); + Table table = util.getConnection().getTable(tableName); + Path rootDir = FSUtils.getRootDir(util.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + LOG.info("creat snapshot for an empty table"); + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + null, null, snapshotName, rootDir, fs, true); + + LOG.info("cause flush to create new files in the region"); + admin.flush(tableName); + table.close(); + } + } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index f61c222fb9..d5d6b457ba 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; @@ -53,6 +56,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; import org.junit.Assert; import org.junit.ClassRule; @@ -203,6 +207,53 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } + @Test + public void testLengthIsSerialized() throws Exception { + Path path = UTIL.getRandomDir(); + TableSnapshotInputFormatImpl.InputSplit split1 = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), + new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path, 1000); + TableSnapshotInputFormatImpl.InputSplit deserialized = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), + new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path); + + ReflectionUtils.copy(new Configuration(), split1, deserialized); + Assert.assertEquals(1000, deserialized.getLength()); + } + + @Test + public void testWithMockedMapReduceWithEmptySnapshot() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceWithSplitsPerRegion"; + final TableName tableName = TableName.valueOf(snapshotName); + try { + createEmptyTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Job job = new Job(UTIL.getConfiguration()); + Scan scan = new Scan(); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, + false, UTIL.getDataTestDirOnTestFS(snapshotName), new RegionSplitter.UniformSplit(), 5); + + List splits = (new TableSnapshotInputFormat()).getSplits(job); + Assert.assertEquals(50, splits.size()); + + for (InputSplit split : splits) { + Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() == 0); + } + + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Override public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, String snapshotName, Path tmpTableDir) throws Exception { @@ -375,6 +426,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() > 0); TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; if (localityEnabled) { Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);