diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 73c5478a72..75e4b8df22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -29,12 +29,14 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.ClientSideRegionScanner; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; @@ -43,10 +45,12 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -55,6 +59,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.Map; +import java.util.HashMap; /** * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. @@ -97,6 +103,7 @@ public class TableSnapshotInputFormatImpl { private String[] locations; private String scan; private String restoreDir; + private long length; // constructor for mapreduce framework / Writable public InputSplit() {} @@ -119,6 +126,12 @@ public class TableSnapshotInputFormatImpl { this.restoreDir = restoreDir.toString(); } + public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, + Scan scan, Path restoreDir, long length) { + this(htd, regionInfo, locations, scan, restoreDir); + this.length = length; + } + public HTableDescriptor getHtd() { return htd; } @@ -132,8 +145,7 @@ public class TableSnapshotInputFormatImpl { } public long getLength() { - //TODO: We can obtain the file sizes of the snapshot here. - return 0; + return length; } public String[] getLocations() { @@ -172,6 +184,7 @@ public class TableSnapshotInputFormatImpl { Bytes.writeByteArray(out, Bytes.toBytes(scan)); Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + WritableUtils.writeVLong(out, length); } @Override @@ -187,6 +200,8 @@ public class TableSnapshotInputFormatImpl { this.scan = Bytes.toString(Bytes.readByteArray(in)); this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); + + this.length = WritableUtils.readVLong(in); } } @@ -356,12 +371,20 @@ public class TableSnapshotInputFormatImpl { Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + //compute split lengths + Path snapshotDir = manifest.getSnapshotDir(); + FileSystem fs = snapshotDir.getFileSystem(conf); + Map regionSizes = getRegionSizes(conf, fs, snapshotDir); + List splits = new ArrayList(); for (HRegionInfo hri : regionManifests) { // load region descriptor + long length = regionSizes.get(hri.getEncodedName()) != null ? regionSizes.get(hri.getEncodedName()) : 0; + if (numSplits > 1) { byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); + length = length / numSplits; for (int i = 0; i < sp.length - 1; i++) { if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])) { @@ -386,7 +409,7 @@ public class TableSnapshotInputFormatImpl { boundedScan.withStopRow( Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); } - splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir, length)); } } } else { @@ -399,7 +422,7 @@ public class TableSnapshotInputFormatImpl { int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir, length)); } } } @@ -408,6 +431,41 @@ public class TableSnapshotInputFormatImpl { } + private static Map getRegionSizes(final Configuration conf, + final FileSystem fs, final Path snapshotDir) throws IOException { + + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + final TableName table = TableName.valueOf(snapshotDesc.getTable()); + final Map sizes = new HashMap<>(); + + LOG.info("Start computing region lengths for snapshot " + snapshotDesc.getName()); + + SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, + new SnapshotReferenceUtil.SnapshotVisitor() { + @Override + public void storeFile(HRegionInfo regionInfo, String familyName, SnapshotRegionManifest.StoreFile storeFile) + throws IOException { + // for storeFile.hasReference() case, copied as part of the manifest + if (!storeFile.hasReference()) { + String region = regionInfo.getEncodedName(); + String hfile = storeFile.getName(); + Path path = HFileLink.createPath(table, region, familyName, hfile); + long size; + if (storeFile.hasFileSize()) { + size = storeFile.getFileSize(); + } else { + size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); + } + size = sizes.get(region) == null ? size : size + sizes.get(region); + sizes.put(region, size); + } + } + }); + + LOG.info("Region lengths computed"); + return sizes; + } + /** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 767d102ab6..1b48d3b8a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -234,4 +234,36 @@ public abstract class TableSnapshotInputFormatTestBase { table.close(); } + protected static void createEmptyTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, int numRegions) + throws Exception { + + try { + LOG.debug("Ensuring table doesn't exist."); + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + LOG.info("creating table '" + tableName + "'"); + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + + Admin admin = util.getHBaseAdmin(); + HTable table = new HTable(util.getConfiguration(), tableName); + Path rootDir = FSUtils.getRootDir(util.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + LOG.info("creat snapshot for an empty table"); + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + null, null, snapshotName, rootDir, fs, true); + + LOG.info("cause flush to create new files in the region"); + admin.flush(tableName); + table.close(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 775d29dbe1..49829547f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -54,6 +56,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -368,6 +371,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() > 0); TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; Scan scan = TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); @@ -460,4 +464,49 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } } + + @Test + public void testLengthIsSerialized() throws Exception { + Path path = UTIL.getRandomDir(); + TableSnapshotInputFormatImpl.InputSplit split1 = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path, 1000); + TableSnapshotInputFormatImpl.InputSplit deserialized = new TableSnapshotInputFormatImpl + .InputSplit(new HTableDescriptor(TableName.valueOf("table")), new HRegionInfo(TableName.valueOf("table")), + Collections.emptyList(), new Scan(), path); + + ReflectionUtils.copy(new Configuration(), split1, deserialized); + Assert.assertEquals(1000, deserialized.getLength()); + } + + @Test + public void testWithMockedMapReduceWithEmptySnapshot() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceWithSplitsPerRegion"; + final TableName tableName = TableName.valueOf(snapshotName); + try { + createEmptyTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Job job = new Job(UTIL.getConfiguration()); + Scan scan = new Scan(); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + UTIL.getDataTestDirOnTestFS(snapshotName), new RegionSplitter.UniformSplit(), 5); + + List splits = (new TableSnapshotInputFormat()).getSplits(job); + Assert.assertEquals(50, splits.size()); + + for (InputSplit split : splits) { + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + Assert.assertTrue(split.getLength() == 0); + } + + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + }