Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1502983) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -82,6 +82,7 @@ private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { @@ -93,8 +94,6 @@ // These configs. are from hbase-*.xml final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); - final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", - HConstants.DEFAULT_BLOCKSIZE); // Invented config. Add to hbase-*.xml if other than default compression. final String defaultCompression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); @@ -104,6 +103,7 @@ // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); final Map bloomTypeMap = createFamilyBloomMap(conf); + final Map blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); final HFileDataBlockEncoder encoder; @@ -200,9 +200,12 @@ if (bloomTypeStr != null) { bloomType = BloomType.valueOf(bloomTypeStr); } + String blockSizeString = blockSizeMap.get(family); + int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE + : Integer.parseInt(blockSizeString); Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize) + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize) .withOutputDir(familydir) .withCompression(AbstractHFileWriter.compressionByName(compression)) .withBloomType(bloomType) @@ -350,12 +353,36 @@ // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); + configureBlockSize(table, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); } + private static void configureBlockSize(HTable table, Configuration conf) throws IOException { + StringBuilder blockSizeConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if(tableDescriptor == null){ + // could happen with mock table instance + return; + } + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + blockSizeConfigValue.append('&'); + } + blockSizeConfigValue.append(URLEncoder.encode( + familyDescriptor.getNameAsString(), "UTF-8")); + blockSizeConfigValue.append('='); + blockSizeConfigValue.append(URLEncoder.encode( + String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); + } + /** * Run inside the task to deserialize column family to compression algorithm * map from the @@ -374,6 +401,10 @@ return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); } + private static Map createFamilyBlockSizeMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); + } + /** * Run inside the task to deserialize column family to given conf value map. *