diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 91e5b22..ae7e53d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; @@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -105,6 +108,15 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + public static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + private static final String OUTPUT_TABLE_NAME_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.table.name"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -188,7 +200,51 @@ public class HFileOutputFormat2 // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + HRegionLocation loc = null; + HTable htable = null; + try { + htable = new HTable(conf, tableName); + loc = htable.getRegionLocation(rowKey); + } catch (Throwable e) { + LOG.warn("there's something wrong when locating rowkey: " + + Bytes.toString(rowKey), e); + loc = null; + } finally { + if(null != htable) { + htable.close(); + } + } + + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to get region location, so use default writer: " + + Bytes.toString(rowKey)); + } + wl = getNewWriter(family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + } + wl = getNewWriter(family, conf, null); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); + } + wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + } + } + } else { + wl = getNewWriter(family, conf, null); + } } // we now have the proper HLog writer. full steam ahead @@ -218,8 +274,8 @@ public class HFileOutputFormat2 * @return A WriterLength, containing a new StoreFile.Writer. * @throws IOException */ - private WriterLength getNewWriter(byte[] family, Configuration conf) - throws IOException { + private WriterLength getNewWriter(byte[] family, Configuration conf, + InetSocketAddress[] favoredNodes) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); Algorithm compression = compressionMap.get(family); @@ -241,10 +297,18 @@ public class HFileOutputFormat2 contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(KeyValue.COMPARATOR) - .withFileContext(hFileContext).build(); + if (null == favoredNodes) { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } this.writers.put(family, wl); return wl; @@ -384,6 +448,12 @@ public class HFileOutputFormat2 MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + // record this table name for creating writer by favored nodes + LOG.info("bulkload locality sensitive enabled"); + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, table.getName().getNameAsString()); + } + // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List startKeys = getRegionStartKeys(table); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index f3157f2..b8c0fc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -335,6 +335,7 @@ public class TestHFileOutputFormat { job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); HTable table = Mockito.mock(HTable.class); setupMockStartKeys(table); + setupMockTableName(table); HFileOutputFormat.configureIncrementalLoad(job, table); assertEquals(job.getNumReduceTasks(), 4); } @@ -776,6 +777,11 @@ public class TestHFileOutputFormat { Mockito.doReturn(mockKeys).when(table).getStartKeys(); } + private void setupMockTableName(HTable table) throws IOException { + TableName mockTableName = TableName.valueOf("mock_table"); + Mockito.doReturn(mockTableName).when(table).getName(); + } + /** * Test that {@link HFileOutputFormat} RecordWriter uses compression and * bloom filter settings from the column family descriptor @@ -804,6 +810,9 @@ public class TestHFileOutputFormat { // pollutes the GZip codec pool with an incompatible compressor. conf.set("io.seqfile.compression.type", "NONE"); conf.set("hbase.fs.tmp.dir", dir.toString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); + Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 28ca56f..bff8494 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HadoopShims; import org.apache.hadoop.hbase.KeyValue; @@ -67,6 +68,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; @@ -336,6 +338,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); HTable table = Mockito.mock(HTable.class); setupMockStartKeys(table); + setupMockTableName(table); HFileOutputFormat2.configureIncrementalLoad(job, table); assertEquals(job.getNumReduceTasks(), 4); } @@ -354,39 +357,66 @@ public class TestHFileOutputFormat2 { @Test public void testMRIncrementalLoad() throws Exception { LOG.info("\nStarting test testMRIncrementalLoad\n"); - doIncrementalLoadTest(false); + doIncrementalLoadTest(false, false); } @Test public void testMRIncrementalLoadWithSplit() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); - doIncrementalLoadTest(true); + doIncrementalLoadTest(true, false); } - private void doIncrementalLoadTest( - boolean shouldChangeRegions) throws Exception { + /** + * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the + * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because + * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to + * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster + * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region + * locality features more easily. + */ + @Test + public void testMRIncrementalLoadWithLocality() throws Exception { + LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); + doIncrementalLoadTest(false, true); + doIncrementalLoadTest(true, true); + } + + private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality) + throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); - byte[][] startKeys = generateRandomStartKeys(5); + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); + int hostCount = 1; + int regionNum = 5; + if (shouldKeepLocality) { + // We should change host count higher than hdfs replica count when MiniHBaseCluster supports + // explicit hostnames parameter just like MiniDFSCluster does. + hostCount = 3; + regionNum = 20; + } + + byte[][] startKeys = generateRandomStartKeys(regionNum); + String[] hostnames = new String[hostCount]; + for (int i = 0; i < hostCount; ++i) { + hostnames[i] = "datanode_" + i; + } + + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); HBaseAdmin admin = null; try { - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + util.startMiniCluster(1, hostCount, hostnames); admin = new HBaseAdmin(conf); HTable table = util.createTable(TABLE_NAME, FAMILIES); - assertEquals("Should start with empty table", - 0, util.countRows(table)); - int numRegions = util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], startKeys); - assertEquals("Should make 5 regions", numRegions, 5); + assertEquals("Should start with empty table", 0, util.countRows(table)); + int numRegions = + util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], startKeys); + assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); // Generate the bulk load files util.startMiniMapReduceCluster(); runIncrementalPELoad(conf, table, testDir); // This doesn't write into the table, just makes files - assertEquals("HFOF should not touch actual table", - 0, util.countRows(table)); - + assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); // Make sure that a directory was created for every CF int dir = 0; @@ -403,17 +433,16 @@ public class TestHFileOutputFormat2 { if (shouldChangeRegions) { LOG.info("Changing regions in table"); admin.disableTable(table.getTableName()); - while(util.getMiniHBaseCluster().getMaster().getAssignmentManager(). - getRegionStates().isRegionsInTransition()) { + while (util.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .isRegionsInTransition()) { Threads.sleep(200); LOG.info("Waiting on table to finish disabling"); } byte[][] newStartKeys = generateRandomStartKeys(15); - util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], newStartKeys); + util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], newStartKeys); admin.enableTable(table.getTableName()); - while (table.getRegionLocations().size() != 15 || - !admin.isTableAvailable(table.getTableName())) { + while (table.getRegionLocations().size() != 15 + || !admin.isTableAvailable(table.getTableName())) { Thread.sleep(200); LOG.info("Waiting for new region assignment to happen"); } @@ -424,8 +453,8 @@ public class TestHFileOutputFormat2 { // Ensure data shows up int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", - expectedRows, util.countRows(table)); + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(table)); Scan scan = new Scan(); ResultScanner results = table.getScanner(scan); for (Result res : results) { @@ -439,6 +468,17 @@ public class TestHFileOutputFormat2 { results.close(); String tableDigestBefore = util.checksumRows(table); + // Check region locality + HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); + for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) { + hbd.add(region.getHDFSBlocksDistribution()); + } + for (String hostname : hostnames) { + float locality = hbd.getBlockLocalityIndex(hostname); + LOG.info("locality of [" + hostname + "]: " + locality); + assertEquals(100, (int) (locality * 100)); + } + // Cause regions to reopen admin.disableTable(TABLE_NAME); while (!admin.isTableDisabled(TABLE_NAME)) { @@ -447,9 +487,11 @@ public class TestHFileOutputFormat2 { } admin.enableTable(TABLE_NAME); util.waitTableAvailable(TABLE_NAME.getName()); - assertEquals("Data should remain after reopening of regions", - tableDigestBefore, util.checksumRows(table)); + assertEquals("Data should remain after reopening of regions", tableDigestBefore, + util.checksumRows(table)); } finally { + util.deleteTable(TABLE_NAME); + testDir.getFileSystem(conf).delete(testDir, true); if (admin != null) admin.close(); util.shutdownMiniMapReduceCluster(); util.shutdownMiniCluster(); @@ -778,6 +820,11 @@ public class TestHFileOutputFormat2 { Mockito.doReturn(mockKeys).when(table).getStartKeys(); } + private void setupMockTableName(HTable table) throws IOException { + TableName mockTableName = TableName.valueOf("mock_table"); + Mockito.doReturn(mockTableName).when(table).getName(); + } + /** * Test that {@link HFileOutputFormat2} RecordWriter uses compression and * bloom filter settings from the column family descriptor @@ -806,6 +853,9 @@ public class TestHFileOutputFormat2 { // pollutes the GZip codec pool with an incompatible compressor. conf.set("io.seqfile.compression.type", "NONE"); conf.set("hbase.fs.tmp.dir", dir.toString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); + Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); @@ -884,7 +934,8 @@ public class TestHFileOutputFormat2 { * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException * will be thrown. */ - @Ignore ("Flakey: See HBASE-9051") @Test + @Ignore("Flakey: See HBASE-9051") + @Test public void testExcludeAllFromMinorCompaction() throws Exception { Configuration conf = util.getConfiguration(); conf.setInt("hbase.hstore.compaction.min", 2);