diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index a5d7c59..3b8cb75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -301,6 +302,17 @@ public class TableMapReduceUtil { initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + /* + * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on + * direct memory will likely cause the map tasks to OOM when opening the region. This + * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user + * wants to override this behavior in their job. + */ + job.getConfiguration().setFloat( + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + job.getConfiguration().unset("hbase.offheapcache.percentage"); + job.getConfiguration().unset("hbase.bucketcache.ioengine"); + // We would need even more libraries that hbase-server depends on TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index c6a59ac..a2ec2ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -34,6 +34,11 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestTableMapReduceUtil { + /* + * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because + * the method depends on an online cluster. + */ + @Test public void testInitTableMapperJob1() throws Exception { Configuration configuration = new Configuration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 66afb85..0f5f747 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -42,6 +43,9 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -55,6 +59,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -202,6 +207,40 @@ public class TestTableSnapshotInputFormat { } @Test + public void testInitTableSnapshotMapperJobConfig() throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig"); + String snapshotName = "foo"; + + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, 1); + Job job = new Job(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // TODO: would be better to examine directly the cache instance that results from this + // config. Currently this is not possible because BlockCache initialization is static. + Assert.assertEquals( + "Snapshot job should be configured for default LruBlockCache.", + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, + job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); + Assert.assertEquals( + "Snapshot job should not use SlabCache.", + null, job.getConfiguration().get("hbase.offheapcache.percentage")); + Assert.assertEquals( + "Snapshot job should not use BucketCache.", + null, job.getConfiguration().get("hbase.bucketcache.ioengine")); + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test public void testWithMockedMapReduceSingleRegion() throws Exception { testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); }