diff --git a/ql/pom.xml b/ql/pom.xml index cfcc171..00cfc83 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -679,6 +679,7 @@ org.apache.commons:commons-lang3 org.jodd:jodd-core org.json:json + io.netty:netty-all org.apache.avro:avro org.apache.avro:avro-mapred org.apache.hive.shims:hive-shims-0.20S diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java index ed9d7ac..b0f28e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -42,11 +44,21 @@ * @param overflow put any additional bytes here * @return true if the output is smaller than input * @throws IOException + * @deprecated Use {@link #compress(ByteBuffer, ByteBuf)} instead. */ boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow ) throws IOException; /** + * Compress the in buffer to the compressed buffer. + * @param in the bytes to compress + * @param compressed the compressed dynamic buffer + * @return true if the compressed is smaller than input + * @throws IOException + */ + boolean compress(ByteBuffer in, ByteBuf compressed) throws IOException; + + /** * Decompress the in buffer to the out buffer. * @param in the bytes to decompress * @param out the decompressed bytes diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java index ef1b6b6..4a955ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.IOException; import java.nio.ByteBuffer; @@ -41,24 +44,16 @@ * Stores the uncompressed bytes that have been serialized, but not * compressed yet. When this fills, we compress the entire buffer. */ - private ByteBuffer current = null; + private ByteBuf current = null; /** * Stores the compressed bytes until we have a full buffer and then outputs - * them to the receiver. If no compression is being done, this (and overflow) + * them to the receiver. If no compression is being done, this buffer * will always be null and the current buffer will be sent directly to the * receiver. */ - private ByteBuffer compressed = null; + private ByteBuf compressed = null; - /** - * Since the compressed buffer may start with contents from previous - * compression blocks, we allocate an overflow buffer so that the - * output of the codec can be split between the two buffers. After the - * compressed buffer is sent to the receiver, the overflow buffer becomes - * the new compressed buffer. - */ - private ByteBuffer overflow = null; private final int bufferSize; private final CompressionCodec codec; private long compressedBytes = 0; @@ -91,35 +86,40 @@ public void clear() throws IOException { * @param val the size in the file * @param original is it uncompressed */ - private static void writeHeader(ByteBuffer buffer, + private static void writeHeader(ByteBuf buffer, int position, int val, boolean original) { - buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); - buffer.put(position + 1, (byte) (val >> 7)); - buffer.put(position + 2, (byte) (val >> 15)); + buffer.setByte(position, (byte) ((val << 1) + (original ? 1 : 0))); + buffer.setByte(position + 1, (byte) (val >> 7)); + buffer.setByte(position + 2, (byte) (val >> 15)); } private void getNewInputBuffer() throws IOException { - if (codec == null) { - current = ByteBuffer.allocate(bufferSize); - } else { - current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); + current = Unpooled.buffer(); + if (codec != null) { writeHeader(current, 0, bufferSize, true); - current.position(HEADER_SIZE); + current.writerIndex(HEADER_SIZE); // write after } } /** * Allocate a new output buffer if we are compressing. */ - private ByteBuffer getNewOutputBuffer() throws IOException { - return ByteBuffer.allocate(bufferSize + HEADER_SIZE); + private ByteBuf getNewOutputBuffer() throws IOException { + return Unpooled.buffer(); + } + + private int getHeaderSize() { + return codec == null? 0 : HEADER_SIZE; + } + + private int getBufferLimit() { + return codec == null ? bufferSize : bufferSize + HEADER_SIZE; } private void flip() throws IOException { - current.limit(current.position()); - current.position(codec == null ? 0 : HEADER_SIZE); + current.readerIndex(getHeaderSize()); } @Override @@ -127,11 +127,11 @@ public void write(int i) throws IOException { if (current == null) { getNewInputBuffer(); } - if (current.remaining() < 1) { + if (current.readableBytes() == getBufferLimit()) { spill(); } uncompressedBytes += 1; - current.put((byte) i); + current.writeByte((byte) i); } @Override @@ -139,15 +139,16 @@ public void write(byte[] bytes, int offset, int length) throws IOException { if (current == null) { getNewInputBuffer(); } - int remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); + int limit = getBufferLimit(); + int remaining = Math.min(limit - current.readableBytes(), length); + current.writeBytes(bytes, offset, remaining); uncompressedBytes += remaining; length -= remaining; while (length != 0) { spill(); offset += remaining; - remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); + remaining = Math.min(limit - current.readableBytes(), length); + current.writeBytes(bytes, offset, remaining); uncompressedBytes += remaining; length -= remaining; } @@ -156,39 +157,32 @@ public void write(byte[] bytes, int offset, int length) throws IOException { private void spill() throws java.io.IOException { // if there isn't anything in the current buffer, don't spill if (current == null || - current.position() == (codec == null ? 0 : HEADER_SIZE)) { + current.writerIndex() == (getHeaderSize())) { return; } flip(); if (codec == null) { - receiver.output(current); + receiver.output(current.nioBuffer()); getNewInputBuffer(); } else { if (compressed == null) { compressed = getNewOutputBuffer(); - } else if (overflow == null) { - overflow = getNewOutputBuffer(); } - int sizePosn = compressed.position(); - compressed.position(compressed.position() + HEADER_SIZE); - if (codec.compress(current, compressed, overflow)) { + int sizePosn = compressed.writerIndex(); + compressed.writerIndex(compressed.writerIndex() + HEADER_SIZE); + if (codec.compress(current.nioBuffer(), compressed)) { uncompressedBytes = 0; - // move position back to after the header - current.position(HEADER_SIZE); - current.limit(current.capacity()); + getNewInputBuffer(); // find the total bytes in the chunk - int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; - if (overflow != null) { - totalBytes += overflow.position(); - } + int totalBytes = compressed.writerIndex() - sizePosn - HEADER_SIZE; + compressedBytes += totalBytes + HEADER_SIZE; writeHeader(compressed, sizePosn, totalBytes, false); // if we have less than the next header left, spill it. - if (compressed.remaining() < HEADER_SIZE) { - compressed.flip(); - receiver.output(compressed); - compressed = overflow; - overflow = null; + if (bufferSize - compressed.readableBytes() < HEADER_SIZE) { + compressed.readerIndex(0); + receiver.output(compressed.nioBuffer()); + compressed = getNewOutputBuffer(); } } else { compressedBytes += uncompressedBytes + HEADER_SIZE; @@ -197,29 +191,17 @@ private void spill() throws java.io.IOException { // compressed buffer first. So back up to where we started, // flip it and add it to done. if (sizePosn != 0) { - compressed.position(sizePosn); - compressed.flip(); - receiver.output(compressed); - compressed = null; - // if we have an overflow, clear it and make it the new compress - // buffer - if (overflow != null) { - overflow.clear(); - compressed = overflow; - overflow = null; - } - } else { - compressed.clear(); - if (overflow != null) { - overflow.clear(); - } + compressed.writerIndex(sizePosn); + compressed.readerIndex(0); + receiver.output(compressed.nioBuffer()); } + compressed = getNewOutputBuffer(); // now add the current buffer into the done list and get a new one. - current.position(0); // update the header with the current length - writeHeader(current, 0, current.limit() - HEADER_SIZE, true); - receiver.output(current); + writeHeader(current, 0, current.readableBytes(), true); + current.readerIndex(0); + receiver.output(current.nioBuffer()); getNewInputBuffer(); } } @@ -237,14 +219,13 @@ void getPosition(PositionRecorder recorder) throws IOException { @Override public void flush() throws IOException { spill(); - if (compressed != null && compressed.position() != 0) { - compressed.flip(); - receiver.output(compressed); + if (compressed != null && compressed.isReadable()) { + compressed.readerIndex(0); + receiver.output(compressed.nioBuffer()); compressed = null; } uncompressedBytes = 0; compressedBytes = 0; - overflow = null; current = null; } @@ -262,9 +243,6 @@ public long getBufferSize() { if (compressed != null) { result += compressed.capacity(); } - if (overflow != null) { - result += overflow.capacity(); - } return result; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java index 820f215..bbfb2e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java @@ -18,15 +18,17 @@ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; -import org.iq80.snappy.Snappy; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.nio.ByteBuffer; import java.util.EnumSet; +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.iq80.snappy.Snappy; + class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { Boolean direct = null; @@ -62,6 +64,25 @@ public boolean compress(ByteBuffer in, ByteBuffer out, } @Override + public boolean compress(ByteBuffer in, ByteBuf out) throws IOException { + int inBytes = in.remaining(); + byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)]; + int outBytes = + Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0); + if (outBytes < inBytes) { + out.ensureWritable(outBytes); + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.writerIndex(), outBytes); + out.writerIndex(out.writerIndex() + outBytes); + return true; + } else { + return false; + } + } + + @Deprecated + @Override public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { if(in.isDirect() && out.isDirect()) { directDecompress(in, out); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java index 03cc3c5..d2e9883 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -26,7 +28,6 @@ import javax.annotation.Nullable; -import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; import org.apache.hadoop.hive.shims.ShimLoader; @@ -49,6 +50,27 @@ private ZlibCodec(int level, int strategy) { } @Override + public boolean compress(ByteBuffer in, ByteBuf out) throws IOException { + Deflater deflater = new Deflater(level, true); + deflater.setStrategy(strategy); + int length = in.remaining(); + deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); + deflater.finish(); + int outSize = 0; + int offset = out.arrayOffset() + out.writerIndex(); + while (!deflater.finished() && (length > outSize)) { + out.ensureWritable(length); + int size = deflater.deflate(out.array(), offset, out.writableBytes()); + out.writerIndex(size + out.writerIndex()); + outSize += size; + offset += size; + } + deflater.end(); + return length > outSize; + } + + @Deprecated + @Override public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow) throws IOException { Deflater deflater = new Deflater(level, true); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 8cb51b0..cfd3f23 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -1830,9 +1830,9 @@ public void testMemoryManagementV12() throws Exception { stripe.getDataLength() < 5000); } // with HIVE-7832, the dictionaries will be disabled after writing the first - // stripe as there are too many distinct values. Hence only 4 stripes as + // stripe as there are too many distinct values. Hence only 6 stripes as // compared to 25 stripes in version 0.11 (above test case) - assertEquals(4, i); + assertEquals(6, i); assertEquals(2500, reader.getNumberOfRows()); }