Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1082821) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -1216,6 +1216,10 @@ return this.comparator; } + public Compression.Algorithm getCompressionAlgorithm() { + return this.compressAlgo; + } + /** * @return index size */ Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1082821) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URLDecoder; +import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -32,7 +36,9 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -64,6 +70,7 @@ */ public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); + static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { @@ -78,8 +85,11 @@ final int blocksize = conf.getInt("hfile.min.blocksize.size", HFile.DEFAULT_BLOCKSIZE); // Invented config. Add to hbase-*.xml if other than default compression. - final String compression = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); + final String defaultCompression = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + + // create a map from column family to the compression algorithm + final Map compressionMap = createFamilyCompressionMap(conf); return new RecordWriter() { // Map of families to writers and how much has been output on the writer. @@ -153,6 +163,8 @@ private WriterLength getNewWriter(byte[] family) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); + String compression = compressionMap.get(family); + compression = compression == null ? defaultCompression : compression; wl.writer = new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); @@ -300,7 +312,69 @@ DistributedCache.addCacheFile(cacheUri, conf); DistributedCache.createSymlink(conf); + // Set compression algorithms based on column families + configureCompression(table, conf); + LOG.info("Incremental table output configured."); } + /** + * Run inside the task to deserialize column family to compression algorithm + * map from the + * configuration. + * + * Package-private for unit tests only. + * + * @return a map from column family to the name of the configured compression + * algorithm + */ + static Map createFamilyCompressionMap(Configuration conf) { + Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); + String compressionConf = conf.get(COMPRESSION_CONF_KEY, ""); + for (String familyConf : compressionConf.split("&")) { + String[] familySplit = familyConf.split("="); + if (familySplit.length != 2) { + continue; + } + + try { + compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + URLDecoder.decode(familySplit[1], "UTF-8")); + } catch (UnsupportedEncodingException e) { + // will not happen with UTF-8 encoding + throw new AssertionError(e); + } + } + return compressionMap; + } + + /** + * Serialize column family to compression algorithm map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * Package-private for unit tests only. + * + * @throws IOException + * on failure to read column family descriptors + */ + static void configureCompression(HTable table, Configuration conf) throws IOException { + StringBuilder compressionConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if(tableDescriptor == null){ + // could happen with mock table instance + return; + } + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + compressionConfigValue.append('&'); + } + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + compressionConfigValue.append('='); + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); + } } Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (revision 1082821) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (working copy) @@ -23,9 +23,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import org.apache.commons.logging.Log; @@ -36,7 +41,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -45,6 +52,10 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.NullWritable; @@ -58,6 +69,8 @@ import org.junit.Test; import org.mockito.Mockito; +import com.google.common.collect.Lists; + /** * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. * Sets up and runs a mapreduce job that writes hfile output. @@ -233,18 +246,11 @@ public void testJobConfiguration() throws Exception { Job job = new Job(); HTable table = Mockito.mock(HTable.class); - byte[][] mockKeys = new byte[][] { - HConstants.EMPTY_BYTE_ARRAY, - Bytes.toBytes("aaa"), - Bytes.toBytes("ggg"), - Bytes.toBytes("zzz") - }; - Mockito.doReturn(mockKeys).when(table).getStartKeys(); - + setupMockStartKeys(table); HFileOutputFormat.configureIncrementalLoad(job, table); assertEquals(job.getNumReduceTasks(), 4); } - + private byte [][] generateRandomStartKeys(int numKeys) { Random random = new Random(); byte[][] ret = new byte[numKeys][]; @@ -373,6 +379,205 @@ assertTrue(job.waitForCompletion(true)); } + /** + * Test for + * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests + * that the compression map is correctly deserialized from configuration + * + * @throws IOException + */ + @Test + public void testCreateFamilyCompressionMap() throws IOException { + for (int numCfs = 0; numCfs <= 3; numCfs++) { + Configuration conf = new Configuration(this.util.getConfiguration()); + Map familyToCompression = getMockColumnFamilies(numCfs); + HTable table = Mockito.mock(HTable.class); + setupMockColumnFamilies(table, familyToCompression); + HFileOutputFormat.configureCompression(table, conf); + + // read back family specific compression setting from the configuration + Map retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf); + + // test that we have a value for all column families that matches with the + // used mock values + for (Entry entry : familyToCompression.entrySet()) { + assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue() + .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + } + } + } + + private void setupMockColumnFamilies(HTable table, + Map familyToCompression) throws IOException + { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToCompression.entrySet()) { + mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey().getBytes(), 1, entry.getValue().getName(), + false, false, 0, "none")); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + private void setupMockStartKeys(HTable table) throws IOException { + byte[][] mockKeys = new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, + Bytes.toBytes("aaa"), + Bytes.toBytes("ggg"), + Bytes.toBytes("zzz") + }; + Mockito.doReturn(mockKeys).when(table).getStartKeys(); + } + + /** + * @return a map from column family names to compression algorithms for + * testing column family compression. Column family names have special characters + */ + private Map getMockColumnFamilies(int numCfs) { + Map familyToCompression = new HashMap(); + // use column family names having special characters + if (numCfs-- > 0) { + familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); + } + if (numCfs-- > 0) { + familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); + } + if (numCfs-- > 0) { + familyToCompression.put("Family3", Compression.Algorithm.NONE); + } + return familyToCompression; + } + + /** + * Test that {@link HFileOutputFormat} RecordWriter uses compression settings + * from the column family descriptor + */ + @Test + public void testColumnFamilyCompression() + throws IOException, InterruptedException { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = + HBaseTestingUtility.getTestDir("testColumnFamilyCompression"); + + HTable table = Mockito.mock(HTable.class); + + Map configuredCompression = + new HashMap(); + Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms(); + + int familyIndex = 0; + for (byte[] family : FAMILIES) { + configuredCompression.put(Bytes.toString(family), + supportedAlgos[familyIndex++ % supportedAlgos.length]); + } + setupMockColumnFamilies(table, configuredCompression); + + // set up the table to return some mock keys + setupMockStartKeys(table); + + try { + // partial map red setup to get an operational writer for testing + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + setupRandomGeneratorMapper(job); + HFileOutputFormat.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, dir); + context = new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID()); + HFileOutputFormat hof = new HFileOutputFormat(); + writer = hof.getRecordWriter(context); + + // write out random rows + writeRandomKeyValues(writer, context, ROWSPERSPLIT); + writer.close(context); + + // Make sure that a directory was created for every CF + FileSystem fileSystem = dir.getFileSystem(conf); + + // commit so that the filesystem has one directory per column family + hof.getOutputCommitter(context).commitTask(context); + for (byte[] family : FAMILIES) { + String familyStr = new String(family); + boolean found = false; + for (FileStatus f : fileSystem.listStatus(dir)) { + + if (Bytes.toString(family).equals(f.getPath().getName())) { + // we found a matching directory + found = true; + + // verify that the compression on this file matches the configured + // compression + Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath(); + Reader reader = new HFile.Reader(fileSystem, dataFilePath, null, false, true); + reader.loadFileInfo(); + assertEquals("Incorrect compression used for column family " + familyStr + + "(reader: " + reader + ")", + configuredCompression.get(familyStr), reader.getCompressionAlgorithm()); + break; + } + } + + if (!found) { + fail("HFile for column family " + familyStr + " not found"); + } + } + + } finally { + dir.getFileSystem(conf).delete(dir, true); + } + } + + + /** + * @return + */ + private Compression.Algorithm[] getSupportedCompressionAlgorithms() { + String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); + List supportedAlgos = Lists.newArrayList(); + + for (String algoName : allAlgos) { + try { + Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); + algo.getCompressor(); + supportedAlgos.add(algo); + }catch (Exception e) { + // this algo is not available + } + } + + return supportedAlgos.toArray(new Compression.Algorithm[0]); + } + + + /** + * Write random values to the writer assuming a table created using + * {@link #FAMILIES} as column family descriptors + */ + private void writeRandomKeyValues(RecordWriter writer, TaskAttemptContext context, + int numRows) + throws IOException, InterruptedException { + byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; + int valLength = 10; + byte valBytes[] = new byte[valLength]; + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + Random random = new Random(); + for (int i = 0; i < numRows; i++) { + + Bytes.putInt(keyBytes, 0, i); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat.FAMILIES) { + KeyValue kv = new KeyValue(keyBytes, family, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + writer.write(key, kv); + } + } + } + public static void main(String args[]) throws Exception { new TestHFileOutputFormat().manualTest(args); }