Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 1585451) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; @@ -67,23 +68,40 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import com.google.common.annotations.VisibleForTesting; + /** * Writes HFiles. Passed KeyValues must arrive in order. * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted - * attribute on created hfiles. Calling write(null,null) will forceably roll + * attribute on created hfiles. Calling write(null,null) will forcibly roll * all HFiles being written. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); - static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; - private static final String DATABLOCK_ENCODING_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; + + // The following constants are private since these are used by + // HFileOutputFormat to internally transfer data between job setup and + // reducer run using conf. + // These should not be changed by the client. + private static final String COMPRESSION_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.compression"; + private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomtype"; + private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.blocksize"; + private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; + // This constant is public since the client can modify this when setting + // up their conf object and thus refer to this symbol. + // It is present for backwards compatibility reasons. Use it only to + // override the auto-detection of datablock encoding. + public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file @@ -95,30 +113,27 @@ final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompression = conf.get("hfile.compression", + final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); + final Algorithm defaultCompression = + AbstractHFileWriter.compressionByName(defaultCompressionStr); + final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); // create a map from column family to the compression algorithm - final Map compressionMap = createFamilyCompressionMap(conf); - final Map bloomTypeMap = createFamilyBloomMap(conf); - final Map blockSizeMap = createFamilyBlockSizeMap(conf); - - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); - final HFileDataBlockEncoder encoder; - if (dataBlockEncodingStr == null) { - encoder = NoOpDataBlockEncoder.INSTANCE; + final Map compressionMap = createFamilyCompressionMap(conf); + final Map bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map blockSizeMap = createFamilyBlockSizeMap(conf); + + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); + final Map datablockEncodingMap = + createFamilyDataBlockEncodingMap(conf); + final HFileDataBlockEncoder overriddenEncoder; + if (dataBlockEncodingStr != null) { + overriddenEncoder = getDataBlockEncoderFromString(dataBlockEncodingStr); } else { - try { - encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding - .valueOf(dataBlockEncodingStr)); - } catch (IllegalArgumentException ex) { - throw new RuntimeException( - "Invalid data block encoding type configured for the param " - + DATABLOCK_ENCODING_CONF_KEY + " : " - + dataBlockEncodingStr); - } + overriddenEncoder = null; } return new RecordWriter() { @@ -194,21 +209,21 @@ throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); - String compression = compressionMap.get(family); + Algorithm compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; - String bloomTypeStr = bloomTypeMap.get(family); - BloomType bloomType = BloomType.NONE; - if (bloomTypeStr != null) { - bloomType = BloomType.valueOf(bloomTypeStr); - } - String blockSizeString = blockSizeMap.get(family); - int blockSize = blockSizeString == null ? HFile.DEFAULT_BLOCKSIZE - : Integer.parseInt(blockSizeString); + BloomType bloomType = bloomTypeMap.get(family); + bloomType = bloomType == null ? BloomType.NONE : bloomType; + Integer blockSize = blockSizeMap.get(family); + blockSize = blockSize == null ? HFile.DEFAULT_BLOCKSIZE : blockSize; + HFileDataBlockEncoder encoder = overriddenEncoder; + encoder = encoder == null ? datablockEncodingMap.get(family) : encoder; + encoder = encoder == null ? NoOpDataBlockEncoder.INSTANCE : encoder; + Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize) .withOutputDir(familydir) - .withCompression(AbstractHFileWriter.compressionByName(compression)) + .withCompression(compression) .withBloomType(bloomType) .withComparator(KeyValue.COMPARATOR) .withDataBlockEncoder(encoder) @@ -378,6 +393,7 @@ configureCompression(table, conf); configureBloomType(table, conf); configureBlockSize(table, conf); + configureDataBlockEncoding(table, conf); TableMapReduceUtil.addDependencyJars(job); LOG.info("Incremental table output configured."); @@ -405,32 +421,95 @@ } /** - * Run inside the task to deserialize column family to compression algorithm - * map from the - * configuration. + * Runs inside the task to deserialize column family to compression algorithm + * map from the configuration. * - * Package-private for unit tests only. - * * @return a map from column family to the name of the configured compression * algorithm */ - static Map createFamilyCompressionMap(Configuration conf) { - return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); + @VisibleForTesting + static Map createFamilyCompressionMap(Configuration conf) { + Map stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); + Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry e : stringMap.entrySet()) { + Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue()); + compressionMap.put(e.getKey(), algorithm); + } + return compressionMap; } - private static Map createFamilyBloomMap(Configuration conf) { - return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); + /** + * Runs inside the task to deserialize column family to bloom type + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the configured bloom type + */ + @VisibleForTesting + static Map createFamilyBloomTypeMap(Configuration conf) { + Map stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); + Map bloomTypeMap = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry e : stringMap.entrySet()) { + BloomType bloomType = BloomType.valueOf(e.getValue()); + bloomTypeMap.put(e.getKey(), bloomType); + } + return bloomTypeMap; } - private static Map createFamilyBlockSizeMap(Configuration conf) { - return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); + /** + * Runs inside the task to deserialize column family to block size + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the configured block size + */ + @VisibleForTesting + static Map createFamilyBlockSizeMap(Configuration conf) { + Map stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); + Map blockSizeMap = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry e : stringMap.entrySet()) { + Integer blockSize = Integer.parseInt(e.getValue()); + blockSizeMap.put(e.getKey(), blockSize); + } + return blockSizeMap; } + + /** + * Runs inside the task to deserialize column family to data block encoding type map from the + * configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to HFileDataBlockEncoder for the configured data block type + * for the family + */ + @VisibleForTesting + static Map createFamilyDataBlockEncodingMap(Configuration conf) { + Map stringMap = + createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); + Map encoderMap = + new TreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry e : stringMap.entrySet()) { + encoderMap.put(e.getKey(), getDataBlockEncoderFromString(e.getValue())); + } + return encoderMap; + } + private static HFileDataBlockEncoder getDataBlockEncoderFromString(String dataBlockEncodingStr) { + HFileDataBlockEncoder encoder; + try { + encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding.valueOf(dataBlockEncodingStr)); + } catch (IllegalArgumentException ex) { + throw new RuntimeException("Invalid data block encoding type configured for the param " + + DATABLOCK_ENCODING_FAMILIES_CONF_KEY + " : " + dataBlockEncodingStr); + } + return encoder; + } + /** * Run inside the task to deserialize column family to given conf value map. * - * @param conf - * @param confName + * @param conf to read the serialized values from + * @param confName conf key to read from the configuration * @return a map of column family to the given configuration value */ private static Map createFamilyConfValueMap(Configuration conf, String confName) { @@ -455,12 +534,13 @@ /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. - * - * Package-private for unit tests only. - * - * @throws IOException - * on failure to read column family descriptors + * + * @param table to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors */ + @VisibleForTesting static void configureCompression(HTable table, Configuration conf) throws IOException { StringBuilder compressionConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); @@ -479,10 +559,20 @@ compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); } // Get rid of the last ampersand - conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); + conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); } - private static void configureBlockSize(HTable table, Configuration conf) throws IOException { + /** + * Serialize column family to block size map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @param table to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static void configureBlockSize(HTable table, Configuration conf) throws IOException { StringBuilder blockSizeConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { @@ -502,16 +592,19 @@ String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); } // Get rid of the last ampersand - conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); } /** * Serialize column family to bloom type map to configuration. * Invoked while configuring the MR job for incremental load. - * - * @throws IOException - * on failure to read column family descriptors + * + * @param table to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors */ + @VisibleForTesting static void configureBloomType(HTable table, Configuration conf) throws IOException { HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { @@ -533,6 +626,42 @@ } bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); } - conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); } + + /** + * Serialize column family to data block encoding map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @param table to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static void configureDataBlockEncoding(HTable table, Configuration conf) throws IOException { + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if (tableDescriptor == null) { + // could happen with mock table instance + return; + } + StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + dataBlockEncodingConfigValue.append('&'); + } + dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), + "UTF-8")); + dataBlockEncodingConfigValue.append('='); + DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); + if (encoding == null) { + encoding = DataBlockEncoding.NONE; + } + dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8")); + } + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString()); + } + } Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (revision 1585451) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (working copy) @@ -30,11 +30,9 @@ import java.lang.reflect.Constructor; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.Random; @@ -53,14 +51,17 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -77,8 +78,6 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import com.google.common.collect.Lists; - /** * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. * Sets up and runs a mapreduce job that writes hfile output. @@ -484,36 +483,40 @@ } /** - * Test for - * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests - * that the compression map is correctly deserialized from configuration + * Test for {@link HFileOutputFormat#configureCompression(HTable, + * Configuration)} and {@link HFileOutputFormat#createFamilyCompressionMap + * (Configuration)}. + * Tests that the compression map is correctly serialized into + * and deserialized from configuration * * @throws IOException */ @Test - public void testCreateFamilyCompressionMap() throws IOException { + public void testSerializeDeserializeFamilyCompressionMap() throws IOException { for (int numCfs = 0; numCfs <= 3; numCfs++) { Configuration conf = new Configuration(this.util.getConfiguration()); - Map familyToCompression = getMockColumnFamilies(numCfs); + Map familyToCompression = + getMockColumnFamiliesForCompression(numCfs); HTable table = Mockito.mock(HTable.class); - setupMockColumnFamilies(table, familyToCompression); + setupMockColumnFamiliesForCompression(table, familyToCompression); HFileOutputFormat.configureCompression(table, conf); // read back family specific compression setting from the configuration - Map retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf); + Map retrievedFamilyToCompressionMap = HFileOutputFormat + .createFamilyCompressionMap(conf); // test that we have a value for all column families that matches with the // used mock values for (Entry entry : familyToCompression.entrySet()) { - assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue() - .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + assertEquals("Compression configuration incorrect for column family:" + + entry.getKey(), entry.getValue(), + retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); } } } - private void setupMockColumnFamilies(HTable table, - Map familyToCompression) throws IOException - { + private void setupMockColumnFamiliesForCompression(HTable table, + Map familyToCompression) throws IOException { HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); for (Entry entry : familyToCompression.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) @@ -525,21 +528,11 @@ Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); } - private void setupMockStartKeys(HTable table) throws IOException { - byte[][] mockKeys = new byte[][] { - HConstants.EMPTY_BYTE_ARRAY, - Bytes.toBytes("aaa"), - Bytes.toBytes("ggg"), - Bytes.toBytes("zzz") - }; - Mockito.doReturn(mockKeys).when(table).getStartKeys(); - } - /** * @return a map from column family names to compression algorithms for * testing column family compression. Column family names have special characters */ - private Map getMockColumnFamilies(int numCfs) { + private Map getMockColumnFamiliesForCompression (int numCfs) { Map familyToCompression = new HashMap(); // use column family names having special characters if (numCfs-- > 0) { @@ -558,6 +551,235 @@ } /** + * Test for {@link HFileOutputFormat#configureBloomType(HTable, + * Configuration)} and {@link HFileOutputFormat#createFamilyBloomTypeMap + * (Configuration)}. + * Tests that the compression map is correctly serialized into + * and deserialized from configuration + * + * @throws IOException + */ + @Test + public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { + for (int numCfs = 0; numCfs <= 2; numCfs++) { + Configuration conf = new Configuration(this.util.getConfiguration()); + Map familyToBloomType = + getMockColumnFamiliesForBloomType(numCfs); + HTable table = Mockito.mock(HTable.class); + setupMockColumnFamiliesForBloomType(table, + familyToBloomType); + HFileOutputFormat.configureBloomType(table, conf); + + // read back family specific bloom type settings from the configuration + Map retrievedFamilyToBloomTypeMap = + HFileOutputFormat + .createFamilyBloomTypeMap(conf); + + // test that we have a value for all column families that matches with the + // used mock values + for (Entry entry : familyToBloomType.entrySet()) { + assertEquals("BloomType configuration incorrect for column family:" + + entry.getKey(), entry.getValue(), + retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes())); + } + } + } + + private void setupMockColumnFamiliesForBloomType(HTable table, + Map familyToDataBlockEncoding) throws IOException { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToDataBlockEncoding.entrySet()) { + mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) + .setMaxVersions(1) + .setBloomFilterType(entry.getValue()) + .setBlockCacheEnabled(false) + .setTimeToLive(0)); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + /** + * @return a map from column family names to compression algorithms for + * testing column family compression. Column family names have special characters + */ + private Map + getMockColumnFamiliesForBloomType (int numCfs) { + Map familyToBloomType = + new HashMap(); + // use column family names having special characters + if (numCfs-- > 0) { + familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); + } + if (numCfs-- > 0) { + familyToBloomType.put("Family2=asdads&!AASD", + BloomType.ROWCOL); + } + if (numCfs-- > 0) { + familyToBloomType.put("Family3", BloomType.NONE); + } + return familyToBloomType; + } + + /** + * Test for {@link HFileOutputFormat#configureBlockSize(HTable, + * Configuration)} and {@link HFileOutputFormat#createFamilyBlockSizeMap + * (Configuration)}. + * Tests that the compression map is correctly serialized into + * and deserialized from configuration + * + * @throws IOException + */ + @Test + public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { + for (int numCfs = 0; numCfs <= 3; numCfs++) { + Configuration conf = new Configuration(this.util.getConfiguration()); + Map familyToBlockSize = + getMockColumnFamiliesForBlockSize(numCfs); + HTable table = Mockito.mock(HTable.class); + setupMockColumnFamiliesForBlockSize(table, + familyToBlockSize); + HFileOutputFormat.configureBlockSize(table, conf); + + // read back family specific data block size from the configuration + Map retrievedFamilyToBlockSizeMap = + HFileOutputFormat + .createFamilyBlockSizeMap(conf); + + // test that we have a value for all column families that matches with the + // used mock values + for (Entry entry : familyToBlockSize.entrySet()) { + assertEquals("BlockSize configuration incorrect for column family:" + + entry.getKey(), entry.getValue(), + retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes())); + } + } + } + + private void setupMockColumnFamiliesForBlockSize(HTable table, + Map familyToDataBlockEncoding) throws IOException { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToDataBlockEncoding.entrySet()) { + mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) + .setMaxVersions(1) + .setBlocksize(entry.getValue()) + .setBlockCacheEnabled(false) + .setTimeToLive(0)); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + /** + * @return a map from column family names to compression algorithms for + * testing column family compression. Column family names have special characters + */ + private Map + getMockColumnFamiliesForBlockSize (int numCfs) { + Map familyToBlockSize = + new HashMap(); + // use column family names having special characters + if (numCfs-- > 0) { + familyToBlockSize.put("Family1!@#!@#&", 1234); + } + if (numCfs-- > 0) { + familyToBlockSize.put("Family2=asdads&!AASD", + Integer.MAX_VALUE); + } + if (numCfs-- > 0) { + familyToBlockSize.put("Family2=asdads&!AASD", + Integer.MAX_VALUE); + } + if (numCfs-- > 0) { + familyToBlockSize.put("Family3", 0); + } + return familyToBlockSize; + } + + /** + * Test for {@link HFileOutputFormat#configureDataBlockEncoding(HTable, + * Configuration)} and {@link HFileOutputFormat#createFamilyDataBlockEncodingMap + * (Configuration)}. + * Tests that the compression map is correctly serialized into + * and deserialized from configuration + * + * @throws IOException + */ + @Test + public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { + for (int numCfs = 0; numCfs <= 3; numCfs++) { + Configuration conf = new Configuration(this.util.getConfiguration()); + Map familyToDataBlockEncoding = + getMockColumnFamiliesForDataBlockEncoding(numCfs); + HTable table = Mockito.mock(HTable.class); + setupMockColumnFamiliesForDataBlockEncoding(table, + familyToDataBlockEncoding); + HFileOutputFormat.configureDataBlockEncoding(table, conf); + + // read back family specific data block encoding settings from the configuration + Map retrievedFamilyToDataBlockEncodingMap = + HFileOutputFormat + .createFamilyDataBlockEncodingMap(conf); + + // test that we have a value for all column families that matches with the + // used mock values + for (Entry entry : familyToDataBlockEncoding.entrySet()) { + assertEquals("DataBlockEncoding configuration incorrect for column family:" + + entry.getKey(), entry.getValue(), + retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes + ()).getEncodingOnDisk()); + } + } + } + + private void setupMockColumnFamiliesForDataBlockEncoding(HTable table, + Map familyToDataBlockEncoding) throws IOException { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToDataBlockEncoding.entrySet()) { + mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) + .setMaxVersions(1) + .setDataBlockEncoding(entry.getValue()) + .setBlockCacheEnabled(false) + .setTimeToLive(0)); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + /** + * @return a map from column family names to compression algorithms for + * testing column family compression. Column family names have special characters + */ + private Map + getMockColumnFamiliesForDataBlockEncoding (int numCfs) { + Map familyToDataBlockEncoding = + new HashMap(); + // use column family names having special characters + if (numCfs-- > 0) { + familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); + } + if (numCfs-- > 0) { + familyToDataBlockEncoding.put("Family2=asdads&!AASD", + DataBlockEncoding.FAST_DIFF); + } + if (numCfs-- > 0) { + familyToDataBlockEncoding.put("Family2=asdads&!AASD", + DataBlockEncoding.PREFIX); + } + if (numCfs-- > 0) { + familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); + } + return familyToDataBlockEncoding; + } + + private void setupMockStartKeys(HTable table) throws IOException { + byte[][] mockKeys = new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, + Bytes.toBytes("aaa"), + Bytes.toBytes("ggg"), + Bytes.toBytes("zzz") + }; + Mockito.doReturn(mockKeys).when(table).getStartKeys(); + } + + /** * Test that {@link HFileOutputFormat} RecordWriter uses compression and * bloom filter settings from the column family descriptor */ @@ -618,7 +840,7 @@ if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); assertEquals("Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", - hcd.getBloomFilterType(), StoreFile.BloomType.valueOf(Bytes.toString(bloomFilter))); + hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); assertEquals("Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", hcd.getCompression(), reader.getCompressionAlgorithm()); }