diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 015cca0..92dd6f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; @@ -41,11 +42,16 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -109,6 +115,15 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + private static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + private static final String OUTPUT_TABLE_NAME_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.output.table.name"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -192,7 +207,49 @@ public class HFileOutputFormat2 // create a new WAL writer, if necessary if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + HRegionLocation loc = null; + String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + + try { + try (Connection connection = ConnectionFactory.createConnection(conf); + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { + loc = locator.getRegionLocation(rowKey); + } + } catch (Throwable e) { + LOG.warn("there's something wrong when locating rowkey: " + + Bytes.toString(rowKey), e); + loc = null; + } + + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to get region location, so use default writer"); + } + wl = getNewWriter(family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to resolve of bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + } + wl = getNewWriter(family, conf, null); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); + } + wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + } + } + } else { + wl = getNewWriter(family, conf, null); + } } // we now have the proper WAL writer. full steam ahead @@ -224,8 +281,8 @@ public class HFileOutputFormat2 */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", justification="Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf) - throws IOException { + private WriterLength getNewWriter(byte[] family, Configuration conf, + InetSocketAddress[] favoredNodes) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); Algorithm compression = compressionMap.get(family); @@ -247,10 +304,18 @@ public class HFileOutputFormat2 contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR) - .withFileContext(hFileContext).build(); + if (null == favoredNodes) { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } this.writers.put(family, wl); return wl; @@ -431,6 +496,12 @@ public class HFileOutputFormat2 MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + // record this table name for creating writer by favored nodes + LOG.info("bulkload locality sensitive enabled"); + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); + } + // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + regionLocator.getName()); List startKeys = getRegionStartKeys(regionLocator); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 9acc5ac..cf9be3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -342,6 +342,7 @@ public class TestHFileOutputFormat { HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class); setupMockStartKeys(regionLocator); + setupMockTableName(regionLocator); HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); assertEquals(job.getNumReduceTasks(), 4); } @@ -796,6 +797,11 @@ public class TestHFileOutputFormat { Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys(); } + private void setupMockTableName(RegionLocator table) throws IOException { + TableName mockTableName = TableName.valueOf("mock_table"); + Mockito.doReturn(mockTableName).when(table).getName(); + } + /** * Test that {@link HFileOutputFormat} RecordWriter uses compression and * bloom filter settings from the column family descriptor @@ -825,6 +831,9 @@ public class TestHFileOutputFormat { // pollutes the GZip codec pool with an incompatible compressor. conf.set("io.seqfile.compression.type", "NONE"); conf.set("hbase.fs.tmp.dir", dir.toString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean("hbase.bulkload.locality.sensitive.enabled", false); + Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 56e475b..0f977ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -345,6 +345,7 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(Table.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class); setupMockStartKeys(regionLocator); + setupMockTableName(regionLocator); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); assertEquals(job.getNumReduceTasks(), 4); } @@ -800,6 +801,11 @@ public class TestHFileOutputFormat2 { Mockito.doReturn(mockKeys).when(table).getStartKeys(); } + private void setupMockTableName(RegionLocator table) throws IOException { + TableName mockTableName = TableName.valueOf("mock_table"); + Mockito.doReturn(mockTableName).when(table).getName(); + } + /** * Test that {@link HFileOutputFormat2} RecordWriter uses compression and * bloom filter settings from the column family descriptor @@ -829,6 +835,9 @@ public class TestHFileOutputFormat2 { // pollutes the GZip codec pool with an incompatible compressor. conf.set("io.seqfile.compression.type", "NONE"); conf.set("hbase.fs.tmp.dir", dir.toString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean("hbase.bulkload.locality.sensitive.enabled", false); + Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job);