Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (revision 147814) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (working copy) @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; @@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -105,6 +108,9 @@ public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + private static final String OUTPUT_TABLE_NAME_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.output.table.name"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -188,7 +194,38 @@ // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); + String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (null == tableName || tableName.isEmpty()) { + LOG.warn("table name is null, so use default writer"); + wl = getNewWriter(family, conf, null); + } else { + HRegionLocation loc = null; + try { + HTable htable = new HTable(conf, tableName); + loc = htable.getRegionLocation(rowKey); + htable.close(); + } catch (Throwable e) { + LOG.warn(e.toString()); + loc = null; + } + + if (null == loc) { + LOG.warn("failed to get region location, so use default writer"); + wl = getNewWriter(family, conf, null); + } else { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (null != initialIsa.getAddress()) { + LOG.info("use favored nodes writer: " + initialIsa.getHostString()); + wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + } else { + LOG.warn("failed to resolve of bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + wl = getNewWriter(family, conf, null); + } + } + } } // we now have the proper HLog writer. full steam ahead @@ -218,7 +255,8 @@ * @return A WriterLength, containing a new StoreFile.Writer. * @throws IOException */ - private WriterLength getNewWriter(byte[] family, Configuration conf) + private WriterLength getNewWriter(byte[] family, Configuration conf, + InetSocketAddress[] favoredNodes) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); @@ -241,10 +279,18 @@ contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(KeyValue.COMPARATOR) - .withFileContext(hFileContext).build(); + if (null == favoredNodes) { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } this.writers.put(family, wl); return wl; @@ -384,6 +430,9 @@ MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); + // record this table name for creating writer by favored nodes + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, Bytes.toString(table.getTableName())); + // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List startKeys = getRegionStartKeys(table);