diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java index 5e2d880..d2249d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java @@ -23,6 +23,8 @@ import javax.annotation.Nullable; +import org.jboss.netty.buffer.ChannelBuffer; + interface CompressionCodec { public enum Modifier { @@ -47,6 +49,15 @@ boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow ) throws IOException; /** + * Compress the in buffer to the out buffer. + * @param byteBuffer + * @param compressed dynamic growth buffer + * @return + * @throws IOException + */ + boolean compress(ByteBuffer in, ChannelBuffer compressed) throws IOException; + + /** * Decompress the in buffer to the out buffer. * @param in the bytes to decompress * @param out the decompressed bytes diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java index ef1b6b6..1684445 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + class OutStream extends PositionedOutputStream { interface OutputReceiver { @@ -40,25 +43,18 @@ /** * Stores the uncompressed bytes that have been serialized, but not * compressed yet. When this fills, we compress the entire buffer. + * the buffer capacity is guaranteed not large than bufferSize */ - private ByteBuffer current = null; + private ChannelBuffer current = null; /** * Stores the compressed bytes until we have a full buffer and then outputs - * them to the receiver. If no compression is being done, this (and overflow) + * them to the receiver. If no compression is being done, this buffer * will always be null and the current buffer will be sent directly to the * receiver. */ - private ByteBuffer compressed = null; + private ChannelBuffer compressed = null; - /** - * Since the compressed buffer may start with contents from previous - * compression blocks, we allocate an overflow buffer so that the - * output of the codec can be split between the two buffers. After the - * compressed buffer is sent to the receiver, the overflow buffer becomes - * the new compressed buffer. - */ - private ByteBuffer overflow = null; private final int bufferSize; private final CompressionCodec codec; private long compressedBytes = 0; @@ -91,35 +87,34 @@ public void clear() throws IOException { * @param val the size in the file * @param original is it uncompressed */ - private static void writeHeader(ByteBuffer buffer, + private static void writeHeader(ChannelBuffer buffer, int position, int val, boolean original) { - buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); - buffer.put(position + 1, (byte) (val >> 7)); - buffer.put(position + 2, (byte) (val >> 15)); + buffer.setByte(position, (byte) ((val << 1) + (original ? 1 : 0))); + buffer.setByte(position + 1, (byte) (val >> 7)); + buffer.setByte(position + 2, (byte) (val >> 15)); } private void getNewInputBuffer() throws IOException { if (codec == null) { - current = ByteBuffer.allocate(bufferSize); + current = ChannelBuffers.dynamicBuffer(); } else { - current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); + current = ChannelBuffers.dynamicBuffer(); writeHeader(current, 0, bufferSize, true); - current.position(HEADER_SIZE); + current.writerIndex(HEADER_SIZE); // write after } } /** * Allocate a new output buffer if we are compressing. */ - private ByteBuffer getNewOutputBuffer() throws IOException { - return ByteBuffer.allocate(bufferSize + HEADER_SIZE); + private ChannelBuffer getNewOutputBuffer() throws IOException { + return ChannelBuffers.dynamicBuffer(); } private void flip() throws IOException { - current.limit(current.position()); - current.position(codec == null ? 0 : HEADER_SIZE); + current.readerIndex(codec == null ? 0 : HEADER_SIZE); } @Override @@ -127,11 +122,11 @@ public void write(int i) throws IOException { if (current == null) { getNewInputBuffer(); } - if (current.remaining() < 1) { + if (current.readableBytes() == bufferSize ) { spill(); } uncompressedBytes += 1; - current.put((byte) i); + current.writeByte((byte) i); } @Override @@ -139,15 +134,19 @@ public void write(byte[] bytes, int offset, int length) throws IOException { if (current == null) { getNewInputBuffer(); } - int remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); + if (current.readableBytes() == bufferSize) { + spill(); + } + // maximum buffer size for current is guaranteed + int remaining = Math.min(bufferSize-current.readableBytes(), length); + current.writeBytes(bytes, offset, remaining); uncompressedBytes += remaining; length -= remaining; while (length != 0) { spill(); offset += remaining; - remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); + remaining = Math.min(bufferSize-current.readableBytes(), length); + current.writeBytes(bytes, offset, remaining); uncompressedBytes += remaining; length -= remaining; } @@ -156,39 +155,32 @@ public void write(byte[] bytes, int offset, int length) throws IOException { private void spill() throws java.io.IOException { // if there isn't anything in the current buffer, don't spill if (current == null || - current.position() == (codec == null ? 0 : HEADER_SIZE)) { + !current.readable()) { return; } flip(); if (codec == null) { - receiver.output(current); + receiver.output(current.toByteBuffer()); getNewInputBuffer(); } else { if (compressed == null) { compressed = getNewOutputBuffer(); - } else if (overflow == null) { - overflow = getNewOutputBuffer(); - } - int sizePosn = compressed.position(); - compressed.position(compressed.position() + HEADER_SIZE); - if (codec.compress(current, compressed, overflow)) { + } + int sizePosn = compressed.writerIndex(); + compressed.writerIndex(compressed.writerIndex() + HEADER_SIZE); + if (codec.compress(current.toByteBuffer(), compressed)) { uncompressedBytes = 0; - // move position back to after the header - current.position(HEADER_SIZE); - current.limit(current.capacity()); + getNewInputBuffer(); // find the total bytes in the chunk - int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; - if (overflow != null) { - totalBytes += overflow.position(); - } + int totalBytes = compressed.writerIndex() - sizePosn - HEADER_SIZE; + compressedBytes += totalBytes + HEADER_SIZE; writeHeader(compressed, sizePosn, totalBytes, false); // if we have less than the next header left, spill it. - if (compressed.remaining() < HEADER_SIZE) { - compressed.flip(); - receiver.output(compressed); - compressed = overflow; - overflow = null; + if (bufferSize - compressed.readableBytes() < HEADER_SIZE) { + compressed.readerIndex(0); + receiver.output(compressed.toByteBuffer()); + compressed = getNewOutputBuffer(); } } else { compressedBytes += uncompressedBytes + HEADER_SIZE; @@ -197,29 +189,17 @@ private void spill() throws java.io.IOException { // compressed buffer first. So back up to where we started, // flip it and add it to done. if (sizePosn != 0) { - compressed.position(sizePosn); - compressed.flip(); - receiver.output(compressed); - compressed = null; - // if we have an overflow, clear it and make it the new compress - // buffer - if (overflow != null) { - overflow.clear(); - compressed = overflow; - overflow = null; - } - } else { - compressed.clear(); - if (overflow != null) { - overflow.clear(); - } - } + compressed.writerIndex(sizePosn); + compressed.readerIndex(0); + receiver.output(compressed.toByteBuffer()); + } + compressed = getNewOutputBuffer(); // now add the current buffer into the done list and get a new one. - current.position(0); // update the header with the current length - writeHeader(current, 0, current.limit() - HEADER_SIZE, true); - receiver.output(current); + writeHeader(current, 0, current.readableBytes(), true); + current.readerIndex(0); + receiver.output(current.toByteBuffer()); getNewInputBuffer(); } } @@ -237,14 +217,13 @@ void getPosition(PositionRecorder recorder) throws IOException { @Override public void flush() throws IOException { spill(); - if (compressed != null && compressed.position() != 0) { - compressed.flip(); - receiver.output(compressed); + if (compressed != null && compressed.readable()) { + compressed.readerIndex(0); + receiver.output(compressed.toByteBuffer()); compressed = null; } uncompressedBytes = 0; compressedBytes = 0; - overflow = null; current = null; } @@ -262,9 +241,6 @@ public long getBufferSize() { if (compressed != null) { result += compressed.capacity(); } - if (overflow != null) { - result += overflow.capacity(); - } return result; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java index 820f215..01b2a4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; import org.iq80.snappy.Snappy; +import org.jboss.netty.buffer.ChannelBuffer; import java.io.IOException; import java.nio.ByteBuffer; @@ -60,6 +61,23 @@ public boolean compress(ByteBuffer in, ByteBuffer out, return false; } } + + public boolean compress(ByteBuffer in, ChannelBuffer out) throws IOException { + int inBytes = in.remaining(); + byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)]; + int outBytes = + Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0); + if (outBytes < inBytes) { + out.ensureWritableBytes(outBytes); + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.writerIndex(), outBytes); + out.writerIndex(out.writerIndex() + outBytes); + return true; + } else { + return false; + } + } @Override public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java index 03cc3c5..aeee369 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java @@ -26,10 +26,10 @@ import javax.annotation.Nullable; -import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; import org.apache.hadoop.hive.shims.ShimLoader; +import org.jboss.netty.buffer.ChannelBuffer; class ZlibCodec implements CompressionCodec, DirectDecompressionCodec { @@ -47,6 +47,27 @@ private ZlibCodec(int level, int strategy) { this.level = level; this.strategy = strategy; } + + public boolean compress(ByteBuffer in, ChannelBuffer out) throws IOException { + Deflater deflater = new Deflater(level, true); + deflater.setStrategy(strategy); + int length = in.remaining(); + deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); + deflater.finish(); + int outSize = 0; + int offset = out.arrayOffset() + out.writerIndex(); + while (!deflater.finished() && (length > outSize)) { + int size = deflater.deflate(out.array(), offset, out.writableBytes()); + out.writerIndex(size + out.writerIndex()); + outSize += size; + offset += size; + if (out.writableBytes() == 0) { + out.ensureWritableBytes(length); + } + } + deflater.end(); + return length > outSize; + } @Override public boolean compress(ByteBuffer in, ByteBuffer out,