diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index db5f5e2..5872d5f 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -29,6 +29,10 @@ public static void registerAllExtensions( * LZO = 3; */ LZO(3, 3), + /** + * BZIP2 = 4; + */ + BZIP2(4, 4), ; /** @@ -47,6 +51,10 @@ public static void registerAllExtensions( * LZO = 3; */ public static final int LZO_VALUE = 3; + /** + * BZIP2 = 4; + */ + public static final int BZIP2_VALUE = 4; public final int getNumber() { return value; } @@ -57,6 +65,7 @@ public static CompressionKind valueOf(int value) { case 1: return ZLIB; case 2: return SNAPPY; case 3: return LZO; + case 4: return BZIP2; default: return null; } } @@ -16808,8 +16817,9 @@ public Builder setMagicBytes( "org.apache.hadoop.hive.ql.io.orc.Compres" + "sionKind\022\034\n\024compressionBlockSize\030\003 \001(\004\022\023" + "\n\007version\030\004 \003(\rB\002\020\001\022\026\n\016metadataLength\030\005 " + - "\001(\004\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010" + - "\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" + "\001(\004\022\016\n\005magic\030\300> \001(\t*E\n\017CompressionKind\022\010" + + "\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003\022" + + "\t\n\005BZIP2\020\004" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Bzip2Codec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Bzip2Codec.java new file mode 100644 index 0000000..a52e7e0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Bzip2Codec.java @@ -0,0 +1,63 @@ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + + +public class Bzip2Codec implements CompressionCodec { + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow) throws IOException { + int inBytes = in.remaining(); + ByteArrayOutputStream compressed = new ByteArrayOutputStream(); + BZip2CompressorOutputStream bout = new BZip2CompressorOutputStream(compressed); + bout.write(in.array(), in.arrayOffset() + in.position(), inBytes); + bout.close(); + + + byte [] cbr = compressed.toByteArray(); + int outBytes = cbr.length; + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(cbr, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(cbr, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(cbr, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + int inOffset = in.position(); + + BZip2CompressorInputStream bin = new BZip2CompressorInputStream(new ByteArrayInputStream(in + .array(),in .arrayOffset() + in.position(), in.limit() - inOffset) + ); + + + while (true) { + int count = bin.read(out.array(), + out.arrayOffset() + out.position(), + out.remaining()); + if (count == -1) { + break; + } + out.position(count + out.position()); + } + out.flip(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionKind.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionKind.java index 07c6116..f235f10 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionKind.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionKind.java @@ -23,5 +23,5 @@ * can be applied to ORC files. */ public enum CompressionKind { - NONE, ZLIB, SNAPPY, LZO + NONE, ZLIB, SNAPPY, LZO, BZIP2 } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index f591605..11021d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -347,6 +347,8 @@ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, break; case LZO: break; + case BZIP2: + break; default: throw new IllegalArgumentException("Unknown compression"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index adc8991..9a45a0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -190,6 +190,8 @@ static CompressionCodec createCodec(CompressionKind kind) { return new ZlibCodec(); case SNAPPY: return new SnappyCodec(); + case BZIP2: + return new Bzip2Codec(); case LZO: try { Class lzo = @@ -1917,6 +1919,7 @@ private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) case ZLIB: return OrcProto.CompressionKind.ZLIB; case SNAPPY: return OrcProto.CompressionKind.SNAPPY; case LZO: return OrcProto.CompressionKind.LZO; + case BZIP2: return OrcProto.CompressionKind.BZIP2; default: throw new IllegalArgumentException("Unknown compression " + kind); } diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index d52d0b6..1e27730 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -177,6 +177,7 @@ enum CompressionKind { ZLIB = 1; SNAPPY = 2; LZO = 3; + BZIP2 = 4; } // Serialized length must be less that 255 bytes diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBzip2.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBzip2.java new file mode 100644 index 0000000..0ac6eff --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBzip2.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.hive.ql.io.orc; + + +import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +public class TestBzip2 { + + @Test + public void testBzip2() throws IOException { + ByteBuffer in = ByteBuffer.allocate(1000); + ByteBuffer out = ByteBuffer.allocate(1000); + ByteBuffer orig = ByteBuffer.allocate(1000); + + String input = "TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT"; + + in.put(input.getBytes()); + in.flip(); + CompressionCodec codec = new Bzip2Codec(); + assertTrue(codec.compress(in, out, null)); + out.flip(); + codec.decompress(out, orig); + byte[] b = new byte[orig.remaining()]; + orig.get(b); + assertEquals(input, new String(b)); + } +}