From 9db42790634102bba05e094a8db40050106a242c Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Mon, 13 Oct 2014 12:46:52 -0700 Subject: [PATCH 1/7] KAFKA-1493 --- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../common/message/KafkaLZ4BlockInputStream.java | 192 ++++++++++++++++++++ .../common/message/KafkaLZ4BlockOutputStream.java | 197 +++++++++++++++++++++ .../kafka/common/record/CompressionType.java | 6 +- .../org/apache/kafka/common/record/Compressor.java | 24 +-- config/producer.properties | 4 +- .../scala/kafka/message/CompressionCodec.scala | 7 - .../scala/kafka/message/CompressionFactory.scala | 13 +- .../main/scala/kafka/tools/ConsoleProducer.scala | 2 +- core/src/main/scala/kafka/tools/PerfConfig.scala | 2 +- .../kafka/api/ProducerCompressionTest.scala | 1 - .../kafka/message/MessageCompressionTest.scala | 12 -- .../scala/unit/kafka/message/MessageTest.scala | 2 +- 13 files changed, 406 insertions(+), 58 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java create mode 100644 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bf4ed66..9095caf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig { /** compression.type */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, lz4, or lz4hc. " + private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, or lz4. " + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** metrics.sample.window.ms */ diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java new file mode 100644 index 0000000..b6acc3a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.util.Utils; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.*; + +public class KafkaLZ4BlockInputStream extends FilterInputStream { + + private final LZ4FastDecompressor decompressor; + private byte[] buffer; + private byte[] compressedBuffer; + private int originalLen; + private int o; + private boolean finished; + + public KafkaLZ4BlockInputStream(InputStream in) { + super(in); + this.decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + this.buffer = new byte[0]; + this.compressedBuffer = new byte[HEADER_LENGTH]; + o = originalLen = 0; + finished = false; + } + + @Override + public int available() throws IOException { + return originalLen - o; + } + + @Override + public int read() throws IOException { + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + return buffer[o++] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Utils.checkRange(b, off, len); + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + len = Math.min(len, originalLen - o); + System.arraycopy(buffer, o, b, off, len); + o += len; + return len; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(long n) throws IOException { + if (finished) { + return -1; + } + if (o == originalLen) { + refill(); + } + if (finished) { + return -1; + } + final int skipped = (int) Math.min(n, originalLen - o); + o += skipped; + return skipped; + } + + private void refill() throws IOException { + readFully(compressedBuffer, HEADER_LENGTH); + for (int i = 0; i < MAGIC_LENGTH; ++i) { + if (compressedBuffer[i] != MAGIC[i]) { + throw new IOException("Stream is corrupted"); + } + } + final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF; + final int compressionMethod = token & 0xF0; + final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F); + if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4) { + throw new IOException("Stream is corrupted"); + } + final int compressedLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1); + originalLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5); + assert HEADER_LENGTH == MAGIC_LENGTH + 9; + if (originalLen > 1 << compressionLevel + || originalLen < 0 + || compressedLen < 0 + || (originalLen == 0 && compressedLen != 0) + || (originalLen != 0 && compressedLen == 0) + || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { + throw new IOException("Stream is corrupted"); + } + if (buffer.length < originalLen) { + buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)]; + } + switch (compressionMethod) { + case COMPRESSION_METHOD_RAW: + readFully(buffer, originalLen); + break; + case COMPRESSION_METHOD_LZ4: + if (compressedBuffer.length < originalLen) { + compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; + } + readFully(compressedBuffer, compressedLen); + try { + final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); + if (compressedLen != compressedLen2) { + throw new IOException("Stream is corrupted"); + } + } catch (LZ4Exception e) { + throw new IOException("Stream is corrupted", e); + } + break; + default: + throw new AssertionError(); + } + + o = 0; + } + + private void readFully(byte[] b, int len) throws IOException { + int read = 0; + while (read < len) { + final int r = in.read(b, read, len - read); + if (r < 0) { + throw new EOFException("Stream ended prematurely"); + } + read += r; + } + assert len == read; + } + + @Override + public boolean markSupported() { + return false; + } + + @SuppressWarnings("sync-override") + @Override + public void mark(int readlimit) { + // unsupported + } + + @SuppressWarnings("sync-override") + @Override + public void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(in=" + in + + ", decompressor=" + decompressor + ")"; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java new file mode 100644 index 0000000..4737af3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.message; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.util.Utils; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/* +* Message format: +* HEADER which consists of: +* 1) magic byte sequence (8 bytes) +* 2) compression method token (1 byte) +* 3) compressed length (4 bytes) +* 4) original message length (4 bytes) +* and compressed message itself +* Block size: 64 Kb +* */ +public class KafkaLZ4BlockOutputStream extends FilterOutputStream { + + static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' }; + static final int MAGIC_LENGTH = MAGIC.length; + + static final int HEADER_LENGTH = + MAGIC_LENGTH // magic bytes + + 1 // token + + 4 // compressed length + + 4; // decompressed length + + static final int COMPRESSION_LEVEL_BASE = 10; + static final int MIN_BLOCK_SIZE = 64; + static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F); + + static final int COMPRESSION_METHOD_RAW = 0x10; + static final int COMPRESSION_METHOD_LZ4 = 0x20; + + private static int compressionLevel(int blockSize) { + if (blockSize < MIN_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize); + } else if (blockSize > MAX_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize); + } + int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2 + assert (1 << compressionLevel) >= blockSize; + assert blockSize * 2 > (1 << compressionLevel); + compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE); + assert compressionLevel >= 0 && compressionLevel <= 0x0F; + return compressionLevel; + } + + private final int blockSize; + private final int compressionLevel; + private final LZ4Compressor compressor; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final boolean syncFlush; + private boolean finished; + private int o; + + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean syncFlush) { + super(out); + this.blockSize = blockSize; + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + this.compressionLevel = compressionLevel(blockSize); + this.buffer = new byte[blockSize]; + final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); + this.compressedBuffer = new byte[compressedBlockSize]; + this.syncFlush = syncFlush; + o = 0; + finished = false; + System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH); + } + + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) { + this(out, blockSize, false); + } + + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException("This stream is already closed"); + } + } + + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (o == blockSize) { + flushBufferedData(); + } + buffer[o++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Utils.checkRange(b, off, len); + ensureNotFinished(); + + while (o + len > blockSize) { + final int l = blockSize - o; + System.arraycopy(b, off, buffer, o, blockSize - o); + o = blockSize; + flushBufferedData(); + off += l; + len -= l; + } + System.arraycopy(b, off, buffer, o, len); + o += len; + } + + @Override + public void write(byte[] b) throws IOException { + ensureNotFinished(); + write(b, 0, b.length); + } + + @Override + public void close() throws IOException { + if (!finished) { + finish(); + } + if (out != null) { + out.close(); + out = null; + } + } + + private void flushBufferedData() throws IOException { + if (o == 0) { + return; + } + int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH); + final int compressMethod; + if (compressedLength >= o) { + compressMethod = COMPRESSION_METHOD_RAW; + compressedLength = o; + System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o); + } else { + compressMethod = COMPRESSION_METHOD_LZ4; + } + + compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel); + writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5); + assert MAGIC_LENGTH + 9 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength); + o = 0; + } + + @Override + public void flush() throws IOException { + if (syncFlush) { + flushBufferedData(); + } + out.flush(); + } + + public void finish() throws IOException { + ensureNotFinished(); + flushBufferedData(); + compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5); + assert MAGIC_LENGTH + 9 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH); + finished = true; + out.flush(); + } + + private static void writeIntLE(int i, byte[] buf, int off) { + buf[off++] = (byte) i; + buf[off++] = (byte) (i >>> 8); + buf[off++] = (byte) (i >>> 16); + buf[off++] = (byte) (i >>> 24); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize + + ", compressor=" + compressor + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 5227b2d..65a7e43 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f); public final int id; public final String name; @@ -42,8 +42,6 @@ public enum CompressionType { return SNAPPY; case 3: return LZ4; - case 4: - return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -58,8 +56,6 @@ public enum CompressionType { return SNAPPY; else if (LZ4.name.equals(name)) return LZ4; - else if (LZ4HC.name.equals(name)) - return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0323f5f..7792763 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream; import org.apache.kafka.common.utils.Utils; import java.io.InputStream; @@ -218,23 +219,9 @@ public class Compressor { } case LZ4: try { - Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class) - .newInstance(buffer); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - case LZ4HC: - try { - Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); - Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); - Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - Object factory = factoryClass.getMethod("fastestInstance").invoke(null); - Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); - OutputStream stream = (OutputStream) lz4BlockOutputStream - .getConstructor(OutputStream.class, Integer.TYPE, compressorClass) - .newInstance(buffer, 1 << 16, compressor); + Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream"); + OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE) + .newInstance(buffer, 1 << 16); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); @@ -266,10 +253,9 @@ public class Compressor { throw new KafkaException(e); } case LZ4: - case LZ4HC: // dynamically load LZ4 class to avoid runtime dependency try { - Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream"); + Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream"); InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class) .newInstance(buffer); return new DataInputStream(stream); diff --git a/config/producer.properties b/config/producer.properties index 39d65d7..47ae3e2 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. -# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally +# specify the compression codec for all data generated: none, gzip, snappy, lz4. +# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index de0a0fa..9439d2b 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -24,7 +24,6 @@ object CompressionCodec { case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec case LZ4CompressionCodec.codec => LZ4CompressionCodec - case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -34,7 +33,6 @@ object CompressionCodec { case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec case LZ4CompressionCodec.name => LZ4CompressionCodec - case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec { val name = "lz4" } -case object LZ4HCCompressionCodec extends CompressionCodec { - val codec = 4 - val name = "lz4hc" -} - case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index 8420e13..dd712a8 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream import java.util.zip.GZIPInputStream import java.io.InputStream +import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} + object CompressionFactory { def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = { @@ -32,11 +34,7 @@ object CompressionFactory { import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case LZ4CompressionCodec => - import net.jpountz.lz4.LZ4BlockOutputStream - new LZ4BlockOutputStream(stream) - case LZ4HCCompressionCodec => - import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} - new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) + new KafkaLZ4BlockOutputStream(stream, 1 << 16) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -49,9 +47,8 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) - case LZ4CompressionCodec | LZ4HCCompressionCodec => - import net.jpountz.lz4.LZ4BlockInputStream - new LZ4BlockInputStream(stream) + case LZ4CompressionCodec => + new KafkaLZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index b024a69..397d80d 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -113,7 +113,7 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." + + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." + "If specified without value, then it defaults to 'gzip'") .withOptionalArg() .describedAs("compression-codec") diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index c720029..d073acf 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index c954851..638b50a 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -125,7 +125,6 @@ object ProducerCompressionTest { list.add(Array("gzip")) list.add(Array("snappy")) list.add(Array("lz4")) - list.add(Array("lz4hc")) list } } diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index 6f0addc..0bb275d 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite { codecs += SnappyCompressionCodec if(isLZ4Available) codecs += LZ4CompressionCodec - if (izLZ4HCAvailable) - codecs += LZ4HCCompressionCodec for(codec <- codecs) testSimpleCompressDecompress(codec) } @@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite { case e: UnsatisfiedLinkError => false } } - - def izLZ4HCAvailable(): Boolean = { - try { - val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, - net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } } diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 958c1a6..7b74a0d 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite { def setUp(): Unit = { val keys = Array(null, "key".getBytes, "".getBytes) val vals = Array("value".getBytes, "".getBytes, null) - val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec) + val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec) for(k <- keys; v <- vals; codec <- codecs) messages += new MessageTestVal(k, v, codec, new Message(v, k, codec)) } -- 1.7.12.4 (Apple Git-37) From 559c4c491922897b8c9fb186c9ec3c9e242a1ea5 Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 12:22:37 -0700 Subject: [PATCH 2/7] KAFKA-1493 Implement LZ4 Frame I/O Streams --- .../common/message/KafkaLZ4BlockInputStream.java | 385 ++++++++------- .../common/message/KafkaLZ4BlockOutputStream.java | 516 ++++++++++++++------- 2 files changed, 568 insertions(+), 333 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java index b6acc3a..6cba973 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java @@ -1,9 +1,12 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,179 +17,213 @@ package org.apache.kafka.common.message; -import net.jpountz.lz4.LZ4Exception; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; -import net.jpountz.util.Utils; +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH; +import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC; -import java.io.EOFException; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.*; - -public class KafkaLZ4BlockInputStream extends FilterInputStream { - - private final LZ4FastDecompressor decompressor; - private byte[] buffer; - private byte[] compressedBuffer; - private int originalLen; - private int o; - private boolean finished; - - public KafkaLZ4BlockInputStream(InputStream in) { - super(in); - this.decompressor = LZ4Factory.fastestInstance().fastDecompressor(); - this.buffer = new byte[0]; - this.compressedBuffer = new byte[HEADER_LENGTH]; - o = originalLen = 0; - finished = false; - } - - @Override - public int available() throws IOException { - return originalLen - o; - } - - @Override - public int read() throws IOException { - if (finished) { - return -1; - } - if (o == originalLen) { - refill(); - } - if (finished) { - return -1; - } - return buffer[o++] & 0xFF; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - Utils.checkRange(b, off, len); - if (finished) { - return -1; - } - if (o == originalLen) { - refill(); - } - if (finished) { - return -1; - } - len = Math.min(len, originalLen - o); - System.arraycopy(buffer, o, b, off, len); - o += len; - return len; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public long skip(long n) throws IOException { - if (finished) { - return -1; - } - if (o == originalLen) { - refill(); - } - if (finished) { - return -1; - } - final int skipped = (int) Math.min(n, originalLen - o); - o += skipped; - return skipped; - } - - private void refill() throws IOException { - readFully(compressedBuffer, HEADER_LENGTH); - for (int i = 0; i < MAGIC_LENGTH; ++i) { - if (compressedBuffer[i] != MAGIC[i]) { - throw new IOException("Stream is corrupted"); - } - } - final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF; - final int compressionMethod = token & 0xF0; - final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F); - if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4) { - throw new IOException("Stream is corrupted"); - } - final int compressedLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1); - originalLen = Utils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5); - assert HEADER_LENGTH == MAGIC_LENGTH + 9; - if (originalLen > 1 << compressionLevel - || originalLen < 0 - || compressedLen < 0 - || (originalLen == 0 && compressedLen != 0) - || (originalLen != 0 && compressedLen == 0) - || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { - throw new IOException("Stream is corrupted"); - } - if (buffer.length < originalLen) { - buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)]; - } - switch (compressionMethod) { - case COMPRESSION_METHOD_RAW: - readFully(buffer, originalLen); - break; - case COMPRESSION_METHOD_LZ4: - if (compressedBuffer.length < originalLen) { - compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; - } - readFully(compressedBuffer, compressedLen); - try { - final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); - if (compressedLen != compressedLen2) { - throw new IOException("Stream is corrupted"); - } - } catch (LZ4Exception e) { - throw new IOException("Stream is corrupted", e); - } - break; - default: - throw new AssertionError(); - } - - o = 0; - } - - private void readFully(byte[] b, int len) throws IOException { - int read = 0; - while (read < len) { - final int r = in.read(b, read, len - read); - if (r < 0) { - throw new EOFException("Stream ended prematurely"); - } - read += r; - } - assert len == read; - } - - @Override - public boolean markSupported() { - return false; - } - - @SuppressWarnings("sync-override") - @Override - public void mark(int readlimit) { - // unsupported - } - - @SuppressWarnings("sync-override") - @Override - public void reset() throws IOException { - throw new IOException("mark/reset not supported"); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(in=" + in - + ", decompressor=" + decompressor + ")"; +import net.jpountz.lz4.KafkaLZ4BlockOutputStream.BD; +import net.jpountz.lz4.KafkaLZ4BlockOutputStream.FLG; +import net.jpountz.util.Utils; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +/** + * A partial implementation of the v1.4.1 LZ4 Framing format. + * + * @see LZ4 Framing Format Spec + */ +public final class KafkaLZ4BlockInputStream extends FilterInputStream { + + public static final String PREMATURE_EOS = "Stream ended prematurely"; + public static final String NOT_SUPPORTED = "Stream unsupported"; + public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; + public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; + + private final LZ4SafeDecompressor decompressor; + private final XXHash32 checksum; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final int maxBlockSize; + private FLG flg; + private BD bd; + private int bufferOffset; + private int bufferSize; + private boolean finished; + + /** + * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @throws IOException + */ + public KafkaLZ4BlockInputStream(InputStream in) throws IOException { + super(in); + decompressor = LZ4Factory.fastestInstance().safeDecompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + readHeader(); + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[maxBlockSize]; + bufferOffset = 0; + bufferSize = 0; + finished = false; + } + + /** + * Reads the magic number and frame descriptor from the underlying {@link InputStream}. + * + * @throws IOException + */ + private void readHeader() throws IOException { + byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; + + // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags + bufferOffset = 6; + if (in.read(header, 0, bufferOffset) != bufferOffset) { + throw new IOException(PREMATURE_EOS); + } + + if (MAGIC != Util.readUintLE(header, bufferOffset-6)) { + throw new IOException(NOT_SUPPORTED); + } + flg = FLG.fromByte(header[bufferOffset-2]); + bd = BD.fromByte(header[bufferOffset-1]); + // TODO read uncompressed content size, update flg.validate() + // TODO read dictionary id, update flg.validate() + + // check stream descriptor hash + byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF); + header[bufferOffset++] = (byte) in.read(); + if (hash != header[bufferOffset-1]) { + throw new IOException(DESCRIPTOR_HASH_MISMATCH); + } + } + + /** + * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, + * and writes the result to a buffer. + * + * @throws IOException + */ + private void readBlock() throws IOException { + int blockSize = Util.readUintLE(in); + + // Check for EndMark + if (blockSize == 0) { + finished = true; + // TODO implement content checksum, update flg.validate() + return; + } else if (blockSize > maxBlockSize) { + throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); + } + + boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; + byte[] bufferToRead; + if (compressed) { + bufferToRead = compressedBuffer; + } else { + blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; + bufferToRead = buffer; + bufferSize = blockSize; + } + + if (in.read(bufferToRead, 0, blockSize) != blockSize) { + throw new IOException(PREMATURE_EOS); + } + + // verify checksum + if (flg.isBlockChecksumSet() && Util.readUintLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { + throw new IOException(BLOCK_HASH_MISMATCH); + } + + if (compressed) { + try { + bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize); + } catch (LZ4Exception e) { + throw new IOException(e); + } + } + + bufferOffset = 0; + } + + @Override + public int read() throws IOException { + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + int value = buffer[bufferOffset++] & 0xFF; + + return value; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + Utils.checkRange(b, off, len); + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + len = Math.min(len, available()); + System.arraycopy(buffer, bufferOffset, b, off, len); + bufferOffset += len; + return len; + } + + @Override + public long skip(long n) throws IOException { + if (finished) { + return 0; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return 0; } + n = Math.min(n, available()); + bufferOffset += n; + return n; + } + + @Override + public int available() throws IOException { + return bufferSize - bufferOffset; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new RuntimeException("mark not supported"); + } + + @Override + public synchronized void reset() throws IOException { + throw new RuntimeException("reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java index 4737af3..92f96c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -1,9 +1,12 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,184 +17,379 @@ package org.apache.kafka.common.message; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.util.Utils; - import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -/* -* Message format: -* HEADER which consists of: -* 1) magic byte sequence (8 bytes) -* 2) compression method token (1 byte) -* 3) compressed length (4 bytes) -* 4) original message length (4 bytes) -* and compressed message itself -* Block size: 64 Kb -* */ -public class KafkaLZ4BlockOutputStream extends FilterOutputStream { - - static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' }; - static final int MAGIC_LENGTH = MAGIC.length; +import net.jpountz.util.Utils; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; - static final int HEADER_LENGTH = - MAGIC_LENGTH // magic bytes - + 1 // token - + 4 // compressed length - + 4; // decompressed length +/** + * A partial implementation of the v1.4.1 LZ4 Framing format. + * + * @see LZ4 Framing Format Spec + */ +public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { - static final int COMPRESSION_LEVEL_BASE = 10; - static final int MIN_BLOCK_SIZE = 64; - static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F); + public static final int MAGIC = 0x184D2204; + public static final int LZ4_MAX_HEADER_LENGTH = 19; + public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000; + + public static final String CLOSED_STREAM = "The stream is already closed"; + + public static final int BLOCKSIZE_64KB = 4; + public static final int BLOCKSIZE_256KB = 5; + public static final int BLOCKSIZE_1MB = 6; + public static final int BLOCKSIZE_4MB = 7; + + private final LZ4Compressor compressor; + private final XXHash32 checksum; + private final FLG flg; + private final BD bd; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final int maxBlockSize; + private int bufferOffset; + private boolean finished; - static final int COMPRESSION_METHOD_RAW = 0x10; - static final int COMPRESSION_METHOD_LZ4 = 0x20; + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception + * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for every block of data + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { + super(out); + compressor = LZ4Factory.fastestInstance().fastCompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + bd = new BD(blockSize); + flg = new FLG(blockChecksum); + bufferOffset = 0; + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)]; + finished = false; + writeHeader(); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { + this(out, blockSize, false); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException { + this(out, BLOCKSIZE_64KB); + } - private static int compressionLevel(int blockSize) { - if (blockSize < MIN_BLOCK_SIZE) { - throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize); - } else if (blockSize > MAX_BLOCK_SIZE) { - throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize); - } - int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2 - assert (1 << compressionLevel) >= blockSize; - assert blockSize * 2 > (1 << compressionLevel); - compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE); - assert compressionLevel >= 0 && compressionLevel <= 0x0F; - return compressionLevel; + /** + * Writes the magic number and frame descriptor to the underlying {@link OutputStream}. + * + * @throws IOException + */ + private void writeHeader() throws IOException { + Util.writeUintLE(MAGIC, buffer, 0); + bufferOffset = 4; + buffer[bufferOffset++] = flg.toByte(); + buffer[bufferOffset++] = bd.toByte(); + // TODO write uncompressed content size, update flg.validate() + // TODO write dictionary id, update flg.validate() + // compute checksum on all descriptor fields + int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF; + buffer[bufferOffset++] = (byte) hash; + // write out frame descriptor + out.write(buffer, 0, bufferOffset); + bufferOffset = 0; + } + + /** + * Compresses buffered data, optionally computes an XXHash32 checksum, and writes + * the result to the underlying {@link OutputStream}. + * + * @throws IOException + */ + private void writeBlock() throws IOException { + if (bufferOffset == 0) { + return; } - - private final int blockSize; - private final int compressionLevel; - private final LZ4Compressor compressor; - private final byte[] buffer; - private final byte[] compressedBuffer; - private final boolean syncFlush; - private boolean finished; - private int o; - - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean syncFlush) { - super(out); - this.blockSize = blockSize; - this.compressor = LZ4Factory.fastestInstance().fastCompressor(); - this.compressionLevel = compressionLevel(blockSize); - this.buffer = new byte[blockSize]; - final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); - this.compressedBuffer = new byte[compressedBlockSize]; - this.syncFlush = syncFlush; - o = 0; - finished = false; - System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH); + + int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0); + byte[] bufferToWrite = compressedBuffer; + int compressMethod = 0; + + // Store block uncompressed if compressed length is greater (incompressible) + if (compressedLength >= bufferOffset) { + bufferToWrite = buffer; + compressedLength = bufferOffset; + compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK; } - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) { - this(out, blockSize, false); + // Write content + Util.writeUintLE(compressedLength | compressMethod, out); + out.write(bufferToWrite, 0, compressedLength); + + // Calculate and write block checksum + if (flg.isBlockChecksumSet()) { + int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0); + Util.writeUintLE(hash, out); } + bufferOffset = 0; + } + + /** + * Similar to the {@link #writeBlock()} method. Writes a 0-length block + * (without block checksum) to signal the end of the block stream. + * + * @throws IOException + */ + private void writeEndMark() throws IOException { + Util.writeUintLE(0, out); + // TODO implement content checksum, update flg.validate() + finished = true; + } - private void ensureNotFinished() { - if (finished) { - throw new IllegalStateException("This stream is already closed"); - } + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (bufferOffset == maxBlockSize) { + writeBlock(); } - - @Override - public void write(int b) throws IOException { - ensureNotFinished(); - if (o == blockSize) { - flushBufferedData(); - } - buffer[o++] = (byte) b; + buffer[bufferOffset++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Utils.checkRange(b, off, len); + ensureNotFinished(); + + int bufferRemainingLength = maxBlockSize - bufferOffset; + // while b will fill the buffer + while (len > bufferRemainingLength) { + // fill remaining space in buffer + System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength); + bufferOffset = maxBlockSize; + writeBlock(); + // compute new offset and length + off += bufferRemainingLength; + len -= bufferRemainingLength; + bufferRemainingLength = maxBlockSize; } + + System.arraycopy(b, off, buffer, bufferOffset, len); + bufferOffset += len; + } - @Override - public void write(byte[] b, int off, int len) throws IOException { - Utils.checkRange(b, off, len); - ensureNotFinished(); - - while (o + len > blockSize) { - final int l = blockSize - o; - System.arraycopy(b, off, buffer, o, blockSize - o); - o = blockSize; - flushBufferedData(); - off += l; - len -= l; - } - System.arraycopy(b, off, buffer, o, len); - o += len; + @Override + public void flush() throws IOException { + if (!finished) { + writeBlock(); } - - @Override - public void write(byte[] b) throws IOException { - ensureNotFinished(); - write(b, 0, b.length); + if (out != null) { + out.flush(); } + } - @Override - public void close() throws IOException { - if (!finished) { - finish(); - } - if (out != null) { - out.close(); - out = null; - } + /** + * A simple state check to ensure the stream is still open. + */ + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException(CLOSED_STREAM); } + } + + /** + * Similar to {@link #close()} + * + * + * @throws IOException + */ + public void finish() throws IOException { + ensureNotFinished(); + flush(); + writeEndMark(); + finished = true; + } - private void flushBufferedData() throws IOException { - if (o == 0) { - return; - } - int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH); - final int compressMethod; - if (compressedLength >= o) { - compressMethod = COMPRESSION_METHOD_RAW; - compressedLength = o; - System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o); - } else { - compressMethod = COMPRESSION_METHOD_LZ4; - } - - compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel); - writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1); - writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5); - assert MAGIC_LENGTH + 9 == HEADER_LENGTH; - out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength); - o = 0; + @Override + public void close() throws IOException { + if (!finished) { + finish(); } - - @Override - public void flush() throws IOException { - if (syncFlush) { - flushBufferedData(); - } - out.flush(); + if (out != null) { + out.close(); + out = null; } + } - public void finish() throws IOException { - ensureNotFinished(); - flushBufferedData(); - compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel); - writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1); - writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5); - assert MAGIC_LENGTH + 9 == HEADER_LENGTH; - out.write(compressedBuffer, 0, HEADER_LENGTH); - finished = true; - out.flush(); + public static class FLG { + + private static final int VERSION = 1; + + private final int presetDictionary; + private final int reserved1; + private final int contentChecksum; + private final int contentSize; + private final int blockChecksum; + private final int blockIndependence; + private final int version; + + public FLG() { + this(false); } - - private static void writeIntLE(int i, byte[] buf, int off) { - buf[off++] = (byte) i; - buf[off++] = (byte) (i >>> 8); - buf[off++] = (byte) (i >>> 16); - buf[off++] = (byte) (i >>> 24); + + public FLG(boolean blockChecksum) { + this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION); } - - @Override - public String toString() { - return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize - + ", compressor=" + compressor + ")"; + + private FLG(int presetDictionary, int reserved1, int contentChecksum, + int contentSize, int blockChecksum, int blockIndependence, int version) { + this.presetDictionary = presetDictionary; + this.reserved1 = reserved1; + this.contentChecksum = contentChecksum; + this.contentSize = contentSize; + this.blockChecksum = blockChecksum; + this.blockIndependence = blockIndependence; + this.version = version; + validate(); + } + + public static FLG fromByte(byte flg) { + int presetDictionary = (flg >>> 0) & 1; + int reserved1 = (flg >>> 1) & 1; + int contentChecksum = (flg >>> 2) & 1; + int contentSize = (flg >>> 3) & 1; + int blockChecksum = (flg >>> 4) & 1; + int blockIndependence = (flg >>> 5) & 1; + int version = (flg >>> 6) & 3; + + return new FLG(presetDictionary, reserved1, contentChecksum, + contentSize, blockChecksum, blockIndependence, version); + } + + public byte toByte() { + return (byte) ( + ((presetDictionary & 1) << 0) + | ((reserved1 & 1) << 1) + | ((contentChecksum & 1) << 2) + | ((contentSize & 1) << 3) + | ((blockChecksum & 1) << 4) + | ((blockIndependence & 1) << 5) + | ((version & 3) << 6) ); + } + + private void validate() { + if (presetDictionary != 0) { + throw new RuntimeException("Preset dictionary is unsupported"); + } + if (reserved1 != 0) { + throw new RuntimeException("Reserved1 field must be 0"); + } + if (contentChecksum != 0) { + throw new RuntimeException("Content checksum is unsupported"); + } + if (contentSize != 0) { + throw new RuntimeException("Content size is unsupported"); + } + if (blockIndependence != 1) { + throw new RuntimeException("Dependent block stream is unsupported"); + } + if (version != VERSION) { + throw new RuntimeException(String.format("Version %d is unsupported", version)); + } + } + + public boolean isPresetDictionarySet() { + return presetDictionary == 1; + } + + public boolean isContentChecksumSet() { + return contentChecksum == 1; + } + + public boolean isContentSizeSet() { + return contentSize == 1; + } + + public boolean isBlockChecksumSet() { + return blockChecksum == 1; + } + + public boolean isBlockIndependenceSet() { + return blockIndependence == 1; + } + + public int getVersion() { + return version; + } + } + + public static class BD { + + private final int reserved2; + private final int blockSizeValue; + private final int reserved3; + + public BD() { + this(0, BLOCKSIZE_64KB, 0); + } + + public BD(int blockSizeValue) { + this(0, blockSizeValue, 0); + } + + private BD(int reserved2, int blockSizeValue, int reserved3) { + this.reserved2 = reserved2; + this.blockSizeValue = blockSizeValue; + this.reserved3 = reserved3; + validate(); + } + + public static BD fromByte(byte bd) { + int reserved2 = (bd >>> 0) & 15; + int blockMaximumSize = (bd >>> 4) & 7; + int reserved3 = (bd >>> 7) & 1; + + return new BD(reserved2, blockMaximumSize, reserved3); + } + + private void validate() { + if (reserved2 != 0) { + throw new RuntimeException("Reserved2 field must be 0"); + } + if (blockSizeValue < 4 || blockSizeValue > 7) { + throw new RuntimeException("Block size value must be between 4 and 7"); + } + if (reserved3 != 0) { + throw new RuntimeException("Reserved3 field must be 0"); + } + } + + // 2^(2n+8) + public int getBlockMaximumSize() { + return (1 << ((2 * blockSizeValue) + 8)); + } + + public byte toByte() { + return (byte) ( + ((reserved2 & 15) << 0) + | ((blockSizeValue & 7) << 4) + | ((reserved3 & 1) << 7) ); } + } + } -- 1.7.12.4 (Apple Git-37) From ea096d4d25a545704da20a742d7b308693112517 Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 13:44:35 -0700 Subject: [PATCH 3/7] KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction --- .../common/message/KafkaLZ4BlockInputStream.java | 22 ++++---- .../common/message/KafkaLZ4BlockOutputStream.java | 19 ++++--- .../org/apache/kafka/common/record/Compressor.java | 4 +- .../java/org/apache/kafka/common/utils/Utils.java | 60 ++++++++++++++++++++++ .../scala/kafka/message/CompressionFactory.scala | 2 +- 5 files changed, 87 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java index 6cba973..5be72fe 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java @@ -25,14 +25,18 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import net.jpountz.lz4.KafkaLZ4BlockOutputStream.BD; -import net.jpountz.lz4.KafkaLZ4BlockOutputStream.FLG; -import net.jpountz.util.Utils; +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.utils.Utils; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; /** - * A partial implementation of the v1.4.1 LZ4 Framing format. + * A partial implementation of the v1.4.1 LZ4 Frame format. * * @see LZ4 Framing Format Spec */ @@ -57,7 +61,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { /** * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. * - * @param out The output stream to compress + * @param in The stream to decompress * @throws IOException */ public KafkaLZ4BlockInputStream(InputStream in) throws IOException { @@ -87,7 +91,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { throw new IOException(PREMATURE_EOS); } - if (MAGIC != Util.readUintLE(header, bufferOffset-6)) { + if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) { throw new IOException(NOT_SUPPORTED); } flg = FLG.fromByte(header[bufferOffset-2]); @@ -110,7 +114,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { * @throws IOException */ private void readBlock() throws IOException { - int blockSize = Util.readUintLE(in); + int blockSize = Utils.readUnsignedIntLE(in); // Check for EndMark if (blockSize == 0) { @@ -136,7 +140,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { } // verify checksum - if (flg.isBlockChecksumSet() && Util.readUintLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { + if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { throw new IOException(BLOCK_HASH_MISMATCH); } @@ -169,7 +173,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { @Override public int read(byte b[], int off, int len) throws IOException { - Utils.checkRange(b, off, len); + net.jpountz.util.Utils.checkRange(b, off, len); if (finished) { return -1; } diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java index 92f96c2..073d006 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -21,12 +21,15 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import net.jpountz.util.Utils; +import org.apache.kafka.common.utils.Utils; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; /** - * A partial implementation of the v1.4.1 LZ4 Framing format. + * A partial implementation of the v1.4.1 LZ4 Frame format. * * @see LZ4 Framing Format Spec */ @@ -78,7 +81,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { /** * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. * - * @param out The output stream to compress + * @param out The stream to compress * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception * @throws IOException */ @@ -102,7 +105,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { * @throws IOException */ private void writeHeader() throws IOException { - Util.writeUintLE(MAGIC, buffer, 0); + Utils.writeUnsignedIntLE(buffer, 0, MAGIC); bufferOffset = 4; buffer[bufferOffset++] = flg.toByte(); buffer[bufferOffset++] = bd.toByte(); @@ -139,13 +142,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { } // Write content - Util.writeUintLE(compressedLength | compressMethod, out); + Utils.writeUnsignedIntLE(out, compressedLength | compressMethod); out.write(bufferToWrite, 0, compressedLength); // Calculate and write block checksum if (flg.isBlockChecksumSet()) { int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0); - Util.writeUintLE(hash, out); + Utils.writeUnsignedIntLE(out, hash); } bufferOffset = 0; } @@ -157,7 +160,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { * @throws IOException */ private void writeEndMark() throws IOException { - Util.writeUintLE(0, out); + Utils.writeUnsignedIntLE(out, 0); // TODO implement content checksum, update flg.validate() finished = true; } @@ -173,7 +176,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { @Override public void write(byte[] b, int off, int len) throws IOException { - Utils.checkRange(b, off, len); + net.jpountz.util.Utils.checkRange(b, off, len); ensureNotFinished(); int bufferRemainingLength = maxBlockSize - bufferOffset; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 7792763..2b1b1c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -220,8 +220,8 @@ public class Compressor { case LZ4: try { Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream"); - OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE) - .newInstance(buffer, 1 << 16); + OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class) + .newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index a0827f5..527dd0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -12,6 +12,9 @@ */ package org.apache.kafka.common.utils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.regex.Matcher; @@ -75,6 +78,34 @@ public class Utils { } /** + * Read an unsigned integer stored in little-endian format from the {@link InputStream}. + * + * @param in The stream to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(InputStream in) throws IOException { + return (in.read() << 8*0) + | (in.read() << 8*1) + | (in.read() << 8*2) + | (in.read() << 8*3); + } + + /** + * Read an unsigned integer stored in little-endian format from a byte array + * at a given offset. + * + * @param buffer The byte array to read from + * @param offset The position in buffer to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(byte[] buffer, int offset) { + return (buffer[offset++] << 8*0) + | (buffer[offset++] << 8*1) + | (buffer[offset++] << 8*2) + | (buffer[offset] << 8*3); + } + + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * * @param buffer The buffer to write to @@ -96,6 +127,35 @@ public class Utils { } /** + * Write an unsigned integer in little-endian format to the {@link OutputStream}. + * + * @param out The stream to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException { + out.write(value >>> 8*0); + out.write(value >>> 8*1); + out.write(value >>> 8*2); + out.write(value >>> 8*3); + } + + /** + * Write an unsigned integer in little-endian format to a byte array + * at a given offset. + * + * @param buffer The byte array to write to + * @param offset The position in buffer to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { + buffer[offset++] = (byte) (value >>> 8*0); + buffer[offset++] = (byte) (value >>> 8*1); + buffer[offset++] = (byte) (value >>> 8*2); + buffer[offset] = (byte) (value >>> 8*3); + } + + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index dd712a8..c721040 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -34,7 +34,7 @@ object CompressionFactory { import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case LZ4CompressionCodec => - new KafkaLZ4BlockOutputStream(stream, 1 << 16) + new KafkaLZ4BlockOutputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } -- 1.7.12.4 (Apple Git-37) From 53dd1891aeb9a9a4afee8ac477d0de2cdd07fba3 Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 21:01:44 -0700 Subject: [PATCH 4/7] KAFKA-1493 Flush stream after writing frame end mark --- .../java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java index 073d006..a679870 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -223,8 +223,8 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { */ public void finish() throws IOException { ensureNotFinished(); - flush(); writeEndMark(); + flush(); finished = true; } -- 1.7.12.4 (Apple Git-37) From e246d82ce7769d93384ab681410a3e5aadb41b8c Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 21:02:47 -0700 Subject: [PATCH 5/7] KAFKA-1493 Remove unused import --- clients/src/main/java/org/apache/kafka/common/record/Compressor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 2b1b1c6..d684e68 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream; import org.apache.kafka.common.utils.Utils; import java.io.InputStream; -- 1.7.12.4 (Apple Git-37) From 426c179fa550ab60c150ce8d8e3db7fb9086d1bc Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 21:20:14 -0700 Subject: [PATCH 6/7] KAFKA-1493 Move finish() logic into close() --- .../kafka/common/message/KafkaLZ4BlockOutputStream.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java index a679870..e5b9e43 100644 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java @@ -214,24 +214,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { throw new IllegalStateException(CLOSED_STREAM); } } - - /** - * Similar to {@link #close()} - * - * - * @throws IOException - */ - public void finish() throws IOException { - ensureNotFinished(); - writeEndMark(); - flush(); - finished = true; - } @Override public void close() throws IOException { if (!finished) { - finish(); + writeEndMark(); + flush(); + finished = true; } if (out != null) { out.close(); -- 1.7.12.4 (Apple Git-37) From 99816983b0d919b220fa26d15713d4d174cd91f6 Mon Sep 17 00:00:00 2001 From: "James D. Oliver" Date: Thu, 16 Oct 2014 21:23:55 -0700 Subject: [PATCH 7/7] KAFKA-1493 Modify test cases to compress a >64kb message to test multi-block lz4 frame compression/decompression --- core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 638b50a..d314b5c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -73,6 +73,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") + props.put(ProducerConfig.LINGER_MS_CONFIG, "200") var producer = new KafkaProducer(props) val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") -- 1.7.12.4 (Apple Git-37)