Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1426405) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -55,11 +56,10 @@ import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; @@ -79,7 +79,7 @@ public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - TimeRangeTracker trt = new TimeRangeTracker(); + private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; @@ -103,6 +103,7 @@ // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); + final Map bloomTypeMap = createFamilyBloomMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); final HFileDataBlockEncoder encoder; @@ -164,7 +165,6 @@ // we now have the proper HLog writer. full steam ahead kv.updateLatestStamp(this.now); - trt.includeTimestamp(kv); wl.writer.append(kv); wl.written += length; @@ -185,9 +185,9 @@ this.rollRequested = false; } - /* Create a new HFile.Writer. + /* Create a new StoreFile.Writer. * @param family - * @return A WriterLength, containing a new HFile.Writer. + * @return A WriterLength, containing a new StoreFile.Writer. * @throws IOException */ private WriterLength getNewWriter(byte[] family, Configuration conf) @@ -196,20 +196,28 @@ Path familydir = new Path(outputdir, Bytes.toString(family)); String compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; - wl.writer = HFile.getWriterFactoryNoCache(conf) - .withPath(fs, StoreFile.getUniqueFile(fs, familydir)) - .withBlockSize(blocksize) - .withCompression(compression) - .withComparator(KeyValue.KEY_COMPARATOR) + String bloomTypeStr = bloomTypeMap.get(family); + BloomType bloomType = BloomType.NONE; + if (bloomTypeStr != null) { + bloomType = BloomType.valueOf(bloomTypeStr); + } + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize) + .withOutputDir(familydir) + .withCompression(AbstractHFileWriter.compressionByName(compression)) + .withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR) .withDataBlockEncoder(encoder) .withChecksumType(Store.getChecksumType(conf)) .withBytesPerChecksum(Store.getBytesPerChecksum(conf)) - .create(); + .build(); + this.writers.put(family, wl); return wl; } - private void close(final HFile.Writer w) throws IOException { + private void close(final StoreFile.Writer w) throws IOException { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); @@ -219,8 +227,7 @@ Bytes.toBytes(true)); w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); - w.appendFileInfo(StoreFile.TIMERANGE_KEY, - WritableUtils.toByteArray(trt)); + w.appendTrackedTimestampsToMetadata(); w.close(); } } @@ -239,7 +246,7 @@ */ static class WriterLength { long written = 0; - HFile.Writer writer = null; + StoreFile.Writer writer = null; } /** @@ -366,7 +373,8 @@ // Set compression algorithms based on column families configureCompression(table, conf); - + configureBloomType(table, conf); + TableMapReduceUtil.addDependencyJars(job); LOG.info("Incremental table output configured."); } @@ -403,25 +411,39 @@ * algorithm */ static Map createFamilyCompressionMap(Configuration conf) { - Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); - String compressionConf = conf.get(COMPRESSION_CONF_KEY, ""); - for (String familyConf : compressionConf.split("&")) { + return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); + } + + private static Map createFamilyBloomMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); + } + + /** + * Run inside the task to deserialize column family to given conf value map. + * + * @param conf + * @param confName + * @return a map of column family to the given configuration value + */ + private static Map createFamilyConfValueMap(Configuration conf, String confName) { + Map confValMap = new TreeMap(Bytes.BYTES_COMPARATOR); + String confVal = conf.get(confName, ""); + for (String familyConf : confVal.split("&")) { String[] familySplit = familyConf.split("="); if (familySplit.length != 2) { continue; } - try { - compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), URLDecoder.decode(familySplit[1], "UTF-8")); } catch (UnsupportedEncodingException e) { // will not happen with UTF-8 encoding throw new AssertionError(e); } } - return compressionMap; + return confValMap; } - + /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. @@ -451,4 +473,35 @@ // Get rid of the last ampersand conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } + + /** + * Serialize column family to bloom type map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @throws IOException + * on failure to read column family descriptors + */ + static void configureBloomType(HTable table, Configuration conf) throws IOException { + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if (tableDescriptor == null) { + // could happen with mock table instance + return; + } + StringBuilder bloomTypeConfigValue = new StringBuilder(); + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + bloomTypeConfigValue.append('&'); + } + bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + bloomTypeConfigValue.append('='); + String bloomType = familyDescriptor.getBloomFilterType().toString(); + if (bloomType == null) { + bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; + } + bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); + } + conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); + } }