diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dbb5173..5f3b270 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -859,6 +859,10 @@ "only affect the light weight encoding for integers. This flag will not\n" + "change the compression level of higher level compression codec (like ZLIB)."), + HIVE_ORC_COMPRESSION_STRATEGY("hive.exec.orc.compression.strategy", "SPEED", new StringSet("SPEED", "COMPRESSION"), + "Define the compression strategy to use while writing data. \n" + + "This changes the compression level of higher level compression codec (like ZLIB)."), + HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false, "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" + "data is read remotely (from the client or HS2 machine) and sent to all the tasks."), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java index 0e8e124..2da490e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java @@ -19,8 +19,20 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.EnumSet; interface CompressionCodec { + + public enum Modifier { + /* speed/compression tradeoffs */ + FASTEST, + FAST, + DEFAULT, + /* data sensitivity modifiers */ + TEXT, + BINARY + }; + /** * Compress the in buffer to the out buffer. * @param in the bytes to compress @@ -39,4 +51,17 @@ boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow * @throws IOException */ void decompress(ByteBuffer in, ByteBuffer out) throws IOException; + + /** + * Produce a modified compression codec if the underlying algorithm allows + * modification. + * + * This does not modify the current object, but returns a new object if + * modifications are possible. Returns the same object if no modifications + * are possible. + * @param modifiers compression modifiers + * @return codec for use after optional modification + */ + CompressionCodec modify(EnumSet modifiers); + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 99500a4..39326c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -101,6 +101,10 @@ public int getMinor() { SPEED, COMPRESSION; } + public static enum CompressionStrategy { + SPEED, COMPRESSION; + } + // Note : these string definitions for table properties are deprecated, // and retained only for backward compatibility, please do not add to // them, add to OrcTableProperties below instead @@ -230,6 +234,7 @@ public static Reader createReader(Path path, private Version versionValue; private WriterCallback callback; private EncodingStrategy encodingStrategy; + private CompressionStrategy compressionStrategy; private float paddingTolerance; WriterOptions(Configuration conf) { @@ -254,6 +259,15 @@ public static Reader createReader(Path path, } else { encodingStrategy = EncodingStrategy.valueOf(enString); } + + String compString = conf + .get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); + if (compString == null) { + compressionStrategy = CompressionStrategy.SPEED; + } else { + compressionStrategy = CompressionStrategy.valueOf(compString); + } + paddingTolerance = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); @@ -403,7 +417,8 @@ public static Writer createWriter(Path path, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, opts.versionValue, opts.callback, - opts.encodingStrategy, opts.paddingTolerance, + opts.encodingStrategy, opts.compressionStrategy, + opts.paddingTolerance, opts.blockSizeValue); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java index 4613015..820f215 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.EnumSet; class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { @@ -99,4 +100,10 @@ public void directDecompress(ByteBuffer in, ByteBuffer out) decompressShim.decompress(in, out); out.flip(); // flip for read } + + @Override + public CompressionCodec modify(EnumSet modifiers) { + // snappy allows no modifications + return this; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 195d60e..6ddfe2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -37,6 +38,8 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics; @@ -139,6 +142,7 @@ private final OrcFile.WriterCallback callback; private final OrcFile.WriterContext callbackContext; private final OrcFile.EncodingStrategy encodingStrategy; + private final OrcFile.CompressionStrategy compressionStrategy; WriterImpl(FileSystem fs, Path path, @@ -153,6 +157,7 @@ OrcFile.Version version, OrcFile.WriterCallback callback, OrcFile.EncodingStrategy encodingStrategy, + CompressionStrategy compressionStrategy, float paddingTolerance, long blockSizeValue) throws IOException { this.fs = fs; @@ -174,6 +179,7 @@ public Writer getWriter() { this.defaultStripeSize = stripeSize; this.version = version; this.encodingStrategy = encodingStrategy; + this.compressionStrategy = compressionStrategy; this.addBlockPadding = addBlockPadding; this.blockSize = blockSizeValue; this.paddingTolerance = paddingTolerance; @@ -447,10 +453,35 @@ public void addPosition(long position) { public OutStream createStream(int column, OrcProto.Stream.Kind kind ) throws IOException { - StreamName name = new StreamName(column, kind); + final StreamName name = new StreamName(column, kind); + final EnumSet modifiers; + + switch (kind) { + case DATA: + case DICTIONARY_DATA: + if (getCompressionStrategy() == CompressionStrategy.SPEED) { + modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT); + } else { + modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT); + } + break; + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY); + break; + default: + modifiers = null; + break; + } + BufferedStream result = streams.get(name); if (result == null) { - result = new BufferedStream(name.toString(), bufferSize, codec); + result = new BufferedStream(name.toString(), bufferSize, + codec.modify(modifiers)); streams.put(name, result); } return result.outStream; @@ -496,6 +527,14 @@ public EncodingStrategy getEncodingStrategy() { } /** + * Get the compression strategy to use. + * @return compression strategy + */ + public CompressionStrategy getCompressionStrategy() { + return compressionStrategy; + } + + /** * Get the writer's configuration. * @return configuration */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java index 27fbb42..a7fd91b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -32,10 +33,24 @@ private Boolean direct = null; + private final int level; + private final int strategy; + + public ZlibCodec() { + level = Deflater.DEFAULT_COMPRESSION; + strategy = Deflater.DEFAULT_STRATEGY; + } + + private ZlibCodec(int level, int strategy) { + this.level = level; + this.strategy = strategy; + } + @Override public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow) throws IOException { - Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); + Deflater deflater = new Deflater(level, true); + deflater.setStrategy(strategy); int length = in.remaining(); deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); deflater.finish(); @@ -113,4 +128,37 @@ public void directDecompress(ByteBuffer in, ByteBuffer out) decompressShim.decompress(in, out); out.flip(); // flip for read } + + @Override + public CompressionCodec modify(EnumSet modifiers) { + int l = this.level; + int s = this.strategy; + + for (Modifier m : modifiers) { + switch (m) { + case BINARY: + /* filtered == less LZ77, more huffman */ + s = Deflater.FILTERED; + break; + case TEXT: + s = Deflater.DEFAULT_STRATEGY; + break; + case FASTEST: + // deflate_fast looking for 8 byte patterns + l = Deflater.BEST_SPEED; + break; + case FAST: + // deflate_fast looking for 16 byte patterns + l = Deflater.BEST_SPEED + 1; + break; + case DEFAULT: + // deflate_slow looking for 128 byte patterns + l = Deflater.DEFAULT_COMPRESSION; + break; + default: + break; + } + } + return new ZlibCodec(l, s); + } }