From ae0b5f1e108aa1245b72497c37f8a96df180a2eb Mon Sep 17 00:00:00 2001 From: Ivan Lyutov Date: Thu, 9 Oct 2014 18:32:54 +0300 Subject: [PATCH] KAFKA-1493 - implemented input/output lz4 streams for kafka message compression, added compression format description, minor typo fix. --- .../common/message/KafkaLZ4BlockInputStream.java | 192 ++++++++++++++++++++ .../common/message/KafkaLZ4BlockOutputStream.java | 197 +++++++++++++++++++++ .../kafka/common/record/CompressionType.java | 6 +- .../org/apache/kafka/common/record/Compressor.java | 24 +-- config/producer.properties | 4 +- .../scala/kafka/message/CompressionCodec.scala | 7 - .../scala/kafka/message/CompressionFactory.scala | 13 +- .../kafka/message/MessageCompressionTest.scala | 12 -- 8 files changed, 402 insertions(+), 53 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java create mode 100644 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java new file mode 100644 index 0000000..b6acc3a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.util.Utils; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.*; + +public class KafkaLZ4BlockInputStream extends FilterInputStream { + + private final LZ4FastDecompressor decompressor; + private byte[] buffer; + private byte[] compressedBuffer; + private int originalLen; + private int o; + private boolean finished; + + public KafkaLZ4BlockInputStream(InputStream in) { + super(in); + this.decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + this.buffer = new byte[0]; + this.compressedBuffer = new byte[HEADER_LENGTH]; + o = originalLen = 0; + finished = false; + } + + @Override + public int available() throws IOException { + return originalLen - o; + } + + @Override + public int read() throws IOException { + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + return buffer[o++] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Utils.checkRange(b, off, len); + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + len = Math.min(len, originalLen - o); + System.arraycopy(buffer, o, b, off, len); + o += len; + return len; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(long n) throws IOException { + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + final int skipped = (int) Math.min(n, originalLen - o); + o += skipped; + return skipped; + } + + private void refill() throws IOException { + readFully(compressedBuffer, HEADER_LENGTH); + for (int i = 0; i < MAGIC_LENGTH; ++i) { + if (compressedBuffer[i] != MAGIC[i]) { + throw new IOException("Stream is corrupted"); + } + } + final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF; + final int compressionMethod = token & 0xF0; + final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F); + if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4) { + throw new IOException("Stream is corrupted"); + } + final int compressedLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1); + originalLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5); + assert HEADER_LENGTH == MAGIC_LENGTH + 9; + if (originalLen > 1 << compressionLevel + || originalLen < 0 + || compressedLen < 0 + || (originalLen == 0 && compressedLen != 0) + || (originalLen != 0 && compressedLen == 0) + || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { + throw new IOException("Stream is corrupted"); + } + if (buffer.length < originalLen) { + buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)]; + } + switch (compressionMethod) { + case COMPRESSION_METHOD_RAW: + readFully(buffer, originalLen); + break; + case COMPRESSION_METHOD_LZ4: + if (compressedBuffer.length < originalLen) { + compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; + } + readFully(compressedBuffer, compressedLen); + try { + final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); + if (compressedLen != compressedLen2) { + throw new IOException("Stream is corrupted"); + } + } catch (LZ4Exception e) { + throw new IOException("Stream is corrupted", e); + } + break; + default: + throw new AssertionError(); + } + + o = 0; + } + + private void readFully(byte[] b, int len) throws IOException { + int read = 0; + while (read < len) { + final int r = in.read(b, read, len - read); + if (r < 0) { + throw new EOFException("Stream ended prematurely"); + } + read += r; + } + assert len == read; + } + + @Override + public boolean markSupported() { + return false; + } + + @SuppressWarnings("sync-override") + @Override + public void mark(int readlimit) { + // unsupported + } + + @SuppressWarnings("sync-override") + @Override + public void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(in=" + in + + ", decompressor=" + decompressor + ")"; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java new file mode 100644 index 0000000..4737af3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.util.Utils; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/* +* Message format: +* HEADER which consists of: +* 1) magic byte sequence (8 bytes) +* 2) compression method token (1 byte) +* 3) compressed length (4 bytes) +* 4) original message length (4 bytes) +* and compressed message itself +* Block size: 64 Kb +* */ +public class KafkaLZ4BlockOutputStream extends FilterOutputStream { + + static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' }; + static final int MAGIC_LENGTH = MAGIC.length; + + static final int HEADER_LENGTH = + MAGIC_LENGTH // magic bytes + + 1 // token + + 4 // compressed length + + 4; // decompressed length + + static final int COMPRESSION_LEVEL_BASE = 10; + static final int MIN_BLOCK_SIZE = 64; + static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F); + + static final int COMPRESSION_METHOD_RAW = 0x10; + static final int COMPRESSION_METHOD_LZ4 = 0x20; + + private static int compressionLevel(int blockSize) { + if (blockSize < MIN_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize); + } else if (blockSize > MAX_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize); + } + int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2 + assert (1 << compressionLevel) >= blockSize; + assert blockSize * 2 > (1 << compressionLevel); + compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE); + assert compressionLevel >= 0 && compressionLevel <= 0x0F; + return compressionLevel; + } + + private final int blockSize; + private final int compressionLevel; + private final LZ4Compressor compressor; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final boolean syncFlush; + private boolean finished; + private int o; + + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean syncFlush) { + super(out); + this.blockSize = blockSize; + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + this.compressionLevel = compressionLevel(blockSize); + this.buffer = new byte[blockSize]; + final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); + this.compressedBuffer = new byte[compressedBlockSize]; + this.syncFlush = syncFlush; + o = 0; + finished = false; + System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH); + } + + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) { + this(out, blockSize, false); + } + + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException("This stream is already closed"); + } + } + + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (o == blockSize) { + flushBufferedData(); + } + buffer[o++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Utils.checkRange(b, off, len); + ensureNotFinished(); + + while (o + len > blockSize) { + final int l = blockSize - o; + System.arraycopy(b, off, buffer, o, blockSize - o); + o = blockSize; + flushBufferedData(); + off += l; + len -= l; + } + System.arraycopy(b, off, buffer, o, len); + o += len; + } + + @Override + public void write(byte[] b) throws IOException { + ensureNotFinished(); + write(b, 0, b.length); + } + + @Override + public void close() throws IOException { + if (!finished) { + finish(); + } + if (out != null) { + out.close(); + out = null; + } + } + + private void flushBufferedData() throws IOException { + if (o == 0) { + return; + } + int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH); + final int compressMethod; + if (compressedLength >= o) { + compressMethod = COMPRESSION_METHOD_RAW; + compressedLength = o; + System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o); + } else { + compressMethod = COMPRESSION_METHOD_LZ4; + } + + compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel); + writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5); + assert MAGIC_LENGTH + 9 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength); + o = 0; + } + + @Override + public void flush() throws IOException { + if (syncFlush) { + flushBufferedData(); + } + out.flush(); + } + + public void finish() throws IOException { + ensureNotFinished(); + flushBufferedData(); + compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5); + assert MAGIC_LENGTH + 9 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH); + finished = true; + out.flush(); + } + + private static void writeIntLE(int i, byte[] buf, int off) { + buf[off++] = (byte) i; + buf[off++] = (byte) (i >>> 8); + buf[off++] = (byte) (i >>> 16); + buf[off++] = (byte) (i >>> 24); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize + + ", compressor=" + compressor + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 5227b2d..65a7e43 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f); public final int id; public final String name; @@ -42,8 +42,6 @@ public enum CompressionType { return SNAPPY; case 3: return LZ4; - case 4: - return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -58,8 +56,6 @@ public enum CompressionType { return SNAPPY; else if (LZ4.name.equals(name)) return LZ4; - else if (LZ4HC.name.equals(name)) - return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0323f5f..7792763 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream; import org.apache.kafka.common.utils.Utils; import java.io.InputStream; @@ -218,23 +219,9 @@ public class Compressor { } case LZ4: try { - Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class) - .newInstance(buffer); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - case LZ4HC: - try { - Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); - Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); - Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - Object factory = factoryClass.getMethod("fastestInstance").invoke(null); - Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); - OutputStream stream = (OutputStream) lz4BlockOutputStream - .getConstructor(OutputStream.class, Integer.TYPE, compressorClass) - .newInstance(buffer, 1 << 16, compressor); + Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream"); + OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE) + .newInstance(buffer, 1 << 16); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); @@ -266,10 +253,9 @@ public class Compressor { throw new KafkaException(e); } case LZ4: - case LZ4HC: // dynamically load LZ4 class to avoid runtime dependency try { - Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream"); + Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream"); InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class) .newInstance(buffer); return new DataInputStream(stream); diff --git a/config/producer.properties b/config/producer.properties index 39d65d7..53043b8 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. -# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally +# specify the compression codec for all data generated: none, gzip, snappy, lz4. +# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index de0a0fa..9439d2b 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -24,7 +24,6 @@ object CompressionCodec { case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec case LZ4CompressionCodec.codec => LZ4CompressionCodec - case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -34,7 +33,6 @@ object CompressionCodec { case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec case LZ4CompressionCodec.name => LZ4CompressionCodec - case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec { val name = "lz4" } -case object LZ4HCCompressionCodec extends CompressionCodec { - val codec = 4 - val name = "lz4hc" -} - case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index 8420e13..dd712a8 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream import java.util.zip.GZIPInputStream import java.io.InputStream +import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} + object CompressionFactory { def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = { @@ -32,11 +34,7 @@ object CompressionFactory { import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case LZ4CompressionCodec => - import net.jpountz.lz4.LZ4BlockOutputStream - new LZ4BlockOutputStream(stream) - case LZ4HCCompressionCodec => - import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} - new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) + new KafkaLZ4BlockOutputStream(stream, 1 << 16) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -49,9 +47,8 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) - case LZ4CompressionCodec | LZ4HCCompressionCodec => - import net.jpountz.lz4.LZ4BlockInputStream - new LZ4BlockInputStream(stream) + case LZ4CompressionCodec => + new KafkaLZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index 6f0addc..0bb275d 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite { codecs += SnappyCompressionCodec if(isLZ4Available) codecs += LZ4CompressionCodec - if (izLZ4HCAvailable) - codecs += LZ4HCCompressionCodec for(codec <- codecs) testSimpleCompressDecompress(codec) } @@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite { case e: UnsatisfiedLinkError => false } } - - def izLZ4HCAvailable(): Boolean = { - try { - val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, - net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } } -- 1.8.4.msysgit.0