Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1343685) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -48,9 +48,14 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; @@ -79,7 +84,9 @@ static Log LOG = LogFactory.getLog(HFileOutputFormat.class); static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; TimeRangeTracker trt = new TimeRangeTracker(); - + private static final String DATABLOCK_ENCODING_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file @@ -101,6 +108,21 @@ // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); + final HFileDataBlockEncoder encoder; + if (dataBlockEncodingStr == null) { + encoder = NoOpDataBlockEncoder.INSTANCE; + } else { + try { + encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding + .valueOf(dataBlockEncodingStr)); + } catch (IllegalArgumentException ex) { + throw new RuntimeException( + "Invalid data block encoding type configured for the param " + + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr); + } + } + return new RecordWriter() { // Map of families to writers and how much has been output on the writer. private final Map writers = @@ -182,6 +204,9 @@ .withBlockSize(blocksize) .withCompression(compression) .withComparator(KeyValue.KEY_COMPARATOR) + .withDataBlockEncoder(encoder) + .withChecksumType(Store.getChecksumType(conf)) + .withBytesPerChecksum(Store.getBytesPerChecksum(conf)) .create(); this.writers.put(family, wl); return wl;