From 7d63ee0e5cec82724c56f91f42a2ad57a9b860e9 Mon Sep 17 00:00:00 2001 From: chenyechao Date: Mon, 11 Mar 2019 21:07:26 +0800 Subject: [PATCH] HBASE-21810 bulkload support set hfile compression on client --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 16 +++++++-- .../hbase/mapreduce/TestHFileOutputFormat2.java | 42 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 31e7e5a..e8f3c1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -112,9 +112,11 @@ public class HFileOutputFormat2 // This constant is public since the client can modify this when setting // up their conf object and thus refer to this symbol. // It is present for backwards compatibility reasons. Use it only to - // override the auto-detection of datablock encoding. + // override the auto-detection of datablock encoding and compression. public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + public static final String COMPRESSION_OVERRIDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.compression"; /** * Keep locality while generating HFiles for bulkload. See HBASE-12596 @@ -122,7 +124,7 @@ public class HFileOutputFormat2 public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; - private static final String OUTPUT_TABLE_NAME_CONF_KEY = + public static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; @@ -150,6 +152,13 @@ public class HFileOutputFormat2 Compression.Algorithm.NONE.getName()); final Algorithm defaultCompression = AbstractHFileWriter .compressionByName(defaultCompressionStr); + String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); + final Algorithm overriddenCompression; + if (compressionStr != null) { + overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr); + } else { + overriddenCompression = null; + } final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); @@ -291,7 +300,8 @@ public class HFileOutputFormat2 throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); + Algorithm compression = overriddenCompression; + compression = compression == null ? compressionMap.get(family) : compression; compression = compression == null ? defaultCompression : compression; BloomType bloomType = bloomTypeMap.get(family); bloomType = bloomType == null ? BloomType.NONE : bloomType; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index d88af6e..dedaba0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -1288,5 +1288,47 @@ public class TestHFileOutputFormat2 { } } + @Test + public void TestConfigureCompression() throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = util.getDataTestDir("TestConfigureCompression"); + String hfileoutputformatCompression = "gz"; + + try { + conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAME.getNameAsString()); + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); + + conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); + + Job job = Job.getInstance(conf); + FileOutputFormat.setOutputPath(job, dir); + context = createTestTaskAttemptContext(job); + HFileOutputFormat2 hof = new HFileOutputFormat2(); + writer = hof.getRecordWriter(context); + final byte[] b = Bytes.toBytes("b"); + + KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); + writer.write(new ImmutableBytesWritable(), kv); + writer.close(context); + writer = null; + FileSystem fs = dir.getFileSystem(conf); + RemoteIterator iterator = fs.listFiles(dir, true); + while (iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + HFile.Reader reader = + HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); + assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression); + } + } finally { + if (writer != null && context != null) { + writer.close(context); + } + dir.getFileSystem(conf).delete(dir, true); + } + + } + } -- 1.8.3.1