Index: HFileOutputFormat.java =================================================================== --- HFileOutputFormat.java (revision 1064109) +++ HFileOutputFormat.java (working copy) @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URLDecoder; +import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -32,6 +36,7 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; @@ -77,9 +82,13 @@ HConstants.DEFAULT_MAX_FILE_SIZE); final int blocksize = conf.getInt("hfile.min.blocksize.size", HFile.DEFAULT_BLOCKSIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String compression = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); + // Invented config. Add to hbase-*.xml if other than default + // compression. + final String defaultCompression = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + + // create a map from column family to the compression algorithm + final Map compressionMap = createFamilyCompressionMap(conf); return new RecordWriter() { // Map of families to writers and how much has been output on the writer. @@ -153,6 +162,8 @@ private WriterLength getNewWriter(byte[] family) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); + String compression = compressionMap.get(family); + compression = compression == null ? defaultCompression : compression; wl.writer = new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); @@ -300,7 +311,53 @@ DistributedCache.addCacheFile(cacheUri, conf); DistributedCache.createSymlink(conf); + // Set compression algorithms based on column families + configureCompression(table, conf); + LOG.info("Incremental table output configured."); } + /** + * Run inside the task to deserialize column family compression map from the + * configuration. + * + * @return a map from column family to the name of the configured compression + * algorithm + */ + private static Map createFamilyCompressionMap(Configuration conf) { + Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); + String compressionConf = conf.get("families.compression", ""); + for (String familyConf : compressionConf.split("&")) { + String[] familySplit = familyConf.split("="); + try { + compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + URLDecoder.decode(familySplit[1], "UTF-8")); + } catch (UnsupportedEncodingException e) { + // will not happen with UTF-8 encoding + throw new AssertionError(e); + } + } + return compressionMap; + } + + /** + * Serialize column family compression algorithms to the configuration + * + * @throws IOException + * on failure to read column family descriptors + */ + private static void configureCompression(HTable table, Configuration conf) throws IOException { + StringBuilder compressionConfigValue = new StringBuilder(); + Collection families = table.getTableDescriptor().getFamilies(); + for (HColumnDescriptor familyDescriptor : families) { + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + compressionConfigValue.append('='); + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); + compressionConfigValue.append('&'); + } + // Get rid of the last ampersand + compressionConfigValue.delete(compressionConfigValue.length() - 1, compressionConfigValue.length() - 1); + conf.set("families.compression", compressionConfigValue.toString()); + } + }