Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1504774) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -82,6 +82,7 @@ private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { @@ -93,8 +94,6 @@ // These configs. are from hbase-*.xml final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); - final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", - HFile.DEFAULT_BLOCKSIZE); // Invented config. Add to hbase-*.xml if other than default compression. final String defaultCompression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); @@ -104,7 +103,8 @@ // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); final Map bloomTypeMap = createFamilyBloomMap(conf); - + final Map blockSizeMap = createFamilyBlockSizeMap(conf); + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); final HFileDataBlockEncoder encoder; if (dataBlockEncodingStr == null) { @@ -201,9 +201,12 @@ if (bloomTypeStr != null) { bloomType = BloomType.valueOf(bloomTypeStr); } + String blockSizeString = blockSizeMap.get(family); + int blockSize = blockSizeString == null ? HFile.DEFAULT_BLOCKSIZE + : Integer.parseInt(blockSizeString); Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize) + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize) .withOutputDir(familydir) .withCompression(AbstractHFileWriter.compressionByName(compression)) .withBloomType(bloomType) @@ -374,6 +377,7 @@ // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); + configureBlockSize(table, conf); TableMapReduceUtil.addDependencyJars(job); LOG.info("Incremental table output configured."); @@ -417,7 +421,11 @@ private static Map createFamilyBloomMap(Configuration conf) { return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); } - + + private static Map createFamilyBlockSizeMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); + } + /** * Run inside the task to deserialize column family to given conf value map. * @@ -474,6 +482,29 @@ conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } + private static void configureBlockSize(HTable table, Configuration conf) throws IOException { + StringBuilder blockSizeConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if (tableDescriptor == null) { + // could happen with mock table instance + return; + } + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + blockSizeConfigValue.append('&'); + } + blockSizeConfigValue.append(URLEncoder.encode( + familyDescriptor.getNameAsString(), "UTF-8")); + blockSizeConfigValue.append('='); + blockSizeConfigValue.append(URLEncoder.encode( + String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); + } + /** * Serialize column family to bloom type map to configuration. * Invoked while configuring the MR job for incremental load.