diff --git a/ql/pom.xml b/ql/pom.xml
index cfcc171..00cfc83 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -679,6 +679,7 @@
org.apache.commons:commons-lang3
org.jodd:jodd-core
org.json:json
+ io.netty:netty-all
org.apache.avro:avro
org.apache.avro:avro-mapred
org.apache.hive.shims:hive-shims-0.20S
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
index ed9d7ac..b0f28e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -42,11 +44,21 @@
* @param overflow put any additional bytes here
* @return true if the output is smaller than input
* @throws IOException
+ * @deprecated Use {@link #compress(ByteBuffer, ByteBuf)} instead.
*/
boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow
) throws IOException;
/**
+ * Compress the in buffer to the compressed buffer.
+ * @param in the bytes to compress
+ * @param compressed the compressed dynamic buffer
+ * @return true if the compressed is smaller than input
+ * @throws IOException
+ */
+ boolean compress(ByteBuffer in, ByteBuf compressed) throws IOException;
+
+ /**
* Decompress the in buffer to the out buffer.
* @param in the bytes to decompress
* @param out the decompressed bytes
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
index ef1b6b6..4a955ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -41,24 +44,16 @@
* Stores the uncompressed bytes that have been serialized, but not
* compressed yet. When this fills, we compress the entire buffer.
*/
- private ByteBuffer current = null;
+ private ByteBuf current = null;
/**
* Stores the compressed bytes until we have a full buffer and then outputs
- * them to the receiver. If no compression is being done, this (and overflow)
+ * them to the receiver. If no compression is being done, this buffer
* will always be null and the current buffer will be sent directly to the
* receiver.
*/
- private ByteBuffer compressed = null;
+ private ByteBuf compressed = null;
- /**
- * Since the compressed buffer may start with contents from previous
- * compression blocks, we allocate an overflow buffer so that the
- * output of the codec can be split between the two buffers. After the
- * compressed buffer is sent to the receiver, the overflow buffer becomes
- * the new compressed buffer.
- */
- private ByteBuffer overflow = null;
private final int bufferSize;
private final CompressionCodec codec;
private long compressedBytes = 0;
@@ -91,35 +86,40 @@ public void clear() throws IOException {
* @param val the size in the file
* @param original is it uncompressed
*/
- private static void writeHeader(ByteBuffer buffer,
+ private static void writeHeader(ByteBuf buffer,
int position,
int val,
boolean original) {
- buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
- buffer.put(position + 1, (byte) (val >> 7));
- buffer.put(position + 2, (byte) (val >> 15));
+ buffer.setByte(position, (byte) ((val << 1) + (original ? 1 : 0)));
+ buffer.setByte(position + 1, (byte) (val >> 7));
+ buffer.setByte(position + 2, (byte) (val >> 15));
}
private void getNewInputBuffer() throws IOException {
- if (codec == null) {
- current = ByteBuffer.allocate(bufferSize);
- } else {
- current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ current = Unpooled.buffer();
+ if (codec != null) {
writeHeader(current, 0, bufferSize, true);
- current.position(HEADER_SIZE);
+ current.writerIndex(HEADER_SIZE); // write after
}
}
/**
* Allocate a new output buffer if we are compressing.
*/
- private ByteBuffer getNewOutputBuffer() throws IOException {
- return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ private ByteBuf getNewOutputBuffer() throws IOException {
+ return Unpooled.buffer();
+ }
+
+ private int getHeaderSize() {
+ return codec == null? 0 : HEADER_SIZE;
+ }
+
+ private int getBufferLimit() {
+ return codec == null ? bufferSize : bufferSize + HEADER_SIZE;
}
private void flip() throws IOException {
- current.limit(current.position());
- current.position(codec == null ? 0 : HEADER_SIZE);
+ current.readerIndex(getHeaderSize());
}
@Override
@@ -127,11 +127,11 @@ public void write(int i) throws IOException {
if (current == null) {
getNewInputBuffer();
}
- if (current.remaining() < 1) {
+ if (current.readableBytes() == getBufferLimit()) {
spill();
}
uncompressedBytes += 1;
- current.put((byte) i);
+ current.writeByte((byte) i);
}
@Override
@@ -139,15 +139,16 @@ public void write(byte[] bytes, int offset, int length) throws IOException {
if (current == null) {
getNewInputBuffer();
}
- int remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
+ int limit = getBufferLimit();
+ int remaining = Math.min(limit - current.readableBytes(), length);
+ current.writeBytes(bytes, offset, remaining);
uncompressedBytes += remaining;
length -= remaining;
while (length != 0) {
spill();
offset += remaining;
- remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
+ remaining = Math.min(limit - current.readableBytes(), length);
+ current.writeBytes(bytes, offset, remaining);
uncompressedBytes += remaining;
length -= remaining;
}
@@ -156,39 +157,32 @@ public void write(byte[] bytes, int offset, int length) throws IOException {
private void spill() throws java.io.IOException {
// if there isn't anything in the current buffer, don't spill
if (current == null ||
- current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+ current.writerIndex() == (getHeaderSize())) {
return;
}
flip();
if (codec == null) {
- receiver.output(current);
+ receiver.output(current.nioBuffer());
getNewInputBuffer();
} else {
if (compressed == null) {
compressed = getNewOutputBuffer();
- } else if (overflow == null) {
- overflow = getNewOutputBuffer();
}
- int sizePosn = compressed.position();
- compressed.position(compressed.position() + HEADER_SIZE);
- if (codec.compress(current, compressed, overflow)) {
+ int sizePosn = compressed.writerIndex();
+ compressed.writerIndex(compressed.writerIndex() + HEADER_SIZE);
+ if (codec.compress(current.nioBuffer(), compressed)) {
uncompressedBytes = 0;
- // move position back to after the header
- current.position(HEADER_SIZE);
- current.limit(current.capacity());
+ getNewInputBuffer();
// find the total bytes in the chunk
- int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
- if (overflow != null) {
- totalBytes += overflow.position();
- }
+ int totalBytes = compressed.writerIndex() - sizePosn - HEADER_SIZE;
+
compressedBytes += totalBytes + HEADER_SIZE;
writeHeader(compressed, sizePosn, totalBytes, false);
// if we have less than the next header left, spill it.
- if (compressed.remaining() < HEADER_SIZE) {
- compressed.flip();
- receiver.output(compressed);
- compressed = overflow;
- overflow = null;
+ if (bufferSize - compressed.readableBytes() < HEADER_SIZE) {
+ compressed.readerIndex(0);
+ receiver.output(compressed.nioBuffer());
+ compressed = getNewOutputBuffer();
}
} else {
compressedBytes += uncompressedBytes + HEADER_SIZE;
@@ -197,29 +191,17 @@ private void spill() throws java.io.IOException {
// compressed buffer first. So back up to where we started,
// flip it and add it to done.
if (sizePosn != 0) {
- compressed.position(sizePosn);
- compressed.flip();
- receiver.output(compressed);
- compressed = null;
- // if we have an overflow, clear it and make it the new compress
- // buffer
- if (overflow != null) {
- overflow.clear();
- compressed = overflow;
- overflow = null;
- }
- } else {
- compressed.clear();
- if (overflow != null) {
- overflow.clear();
- }
+ compressed.writerIndex(sizePosn);
+ compressed.readerIndex(0);
+ receiver.output(compressed.nioBuffer());
}
+ compressed = getNewOutputBuffer();
// now add the current buffer into the done list and get a new one.
- current.position(0);
// update the header with the current length
- writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
- receiver.output(current);
+ writeHeader(current, 0, current.readableBytes(), true);
+ current.readerIndex(0);
+ receiver.output(current.nioBuffer());
getNewInputBuffer();
}
}
@@ -237,14 +219,13 @@ void getPosition(PositionRecorder recorder) throws IOException {
@Override
public void flush() throws IOException {
spill();
- if (compressed != null && compressed.position() != 0) {
- compressed.flip();
- receiver.output(compressed);
+ if (compressed != null && compressed.isReadable()) {
+ compressed.readerIndex(0);
+ receiver.output(compressed.nioBuffer());
compressed = null;
}
uncompressedBytes = 0;
compressedBytes = 0;
- overflow = null;
current = null;
}
@@ -262,9 +243,6 @@ public long getBufferSize() {
if (compressed != null) {
result += compressed.capacity();
}
- if (overflow != null) {
- result += overflow.capacity();
- }
return result;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
index 820f215..bbfb2e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
@@ -18,15 +18,17 @@
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
-import org.iq80.snappy.Snappy;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.iq80.snappy.Snappy;
+
class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
Boolean direct = null;
@@ -62,6 +64,25 @@ public boolean compress(ByteBuffer in, ByteBuffer out,
}
@Override
+ public boolean compress(ByteBuffer in, ByteBuf out) throws IOException {
+ int inBytes = in.remaining();
+ byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)];
+ int outBytes =
+ Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
+ compressed, 0);
+ if (outBytes < inBytes) {
+ out.ensureWritable(outBytes);
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.writerIndex(), outBytes);
+ out.writerIndex(out.writerIndex() + outBytes);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Deprecated
+ @Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
if(in.isDirect() && out.isDirect()) {
directDecompress(in, out);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
index 03cc3c5..d2e9883 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
@@ -26,7 +28,6 @@
import javax.annotation.Nullable;
-import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -49,6 +50,27 @@ private ZlibCodec(int level, int strategy) {
}
@Override
+ public boolean compress(ByteBuffer in, ByteBuf out) throws IOException {
+ Deflater deflater = new Deflater(level, true);
+ deflater.setStrategy(strategy);
+ int length = in.remaining();
+ deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
+ deflater.finish();
+ int outSize = 0;
+ int offset = out.arrayOffset() + out.writerIndex();
+ while (!deflater.finished() && (length > outSize)) {
+ out.ensureWritable(length);
+ int size = deflater.deflate(out.array(), offset, out.writableBytes());
+ out.writerIndex(size + out.writerIndex());
+ outSize += size;
+ offset += size;
+ }
+ deflater.end();
+ return length > outSize;
+ }
+
+ @Deprecated
+ @Override
public boolean compress(ByteBuffer in, ByteBuffer out,
ByteBuffer overflow) throws IOException {
Deflater deflater = new Deflater(level, true);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index 8cb51b0..cfd3f23 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -1830,9 +1830,9 @@ public void testMemoryManagementV12() throws Exception {
stripe.getDataLength() < 5000);
}
// with HIVE-7832, the dictionaries will be disabled after writing the first
- // stripe as there are too many distinct values. Hence only 4 stripes as
+ // stripe as there are too many distinct values. Hence only 6 stripes as
// compared to 25 stripes in version 0.11 (above test case)
- assertEquals(4, i);
+ assertEquals(6, i);
assertEquals(2500, reader.getNumberOfRows());
}