diff --git a/build.gradle b/build.gradle index b3bbd77..2f4167f 100644 --- a/build.gradle +++ b/build.gradle @@ -365,6 +365,7 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" compile 'org.xerial.snappy:snappy-java:1.0.5' + compile 'net.jpountz.lz4:lz4:1.2.0' testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index c557e44..5227b2d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); public final int id; public final String name; @@ -40,6 +40,10 @@ public enum CompressionType { return GZIP; case 2: return SNAPPY; + case 3: + return LZ4; + case 4: + return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -52,6 +56,10 @@ public enum CompressionType { return GZIP; else if (SNAPPY.name.equals(name)) return SNAPPY; + else if (LZ4.name.equals(name)) + return LZ4; + else if (LZ4HC.name.equals(name)) + return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 6ae3d06..0fa6dd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -208,6 +208,29 @@ public class Compressor { } catch (Exception e) { throw new KafkaException(e); } + case LZ4: + try { + Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); + OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class) + .newInstance(buffer); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + case LZ4HC: + try { + Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); + Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); + Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); + Object factory = factoryClass.getMethod("fastestInstance").invoke(null); + Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); + OutputStream stream = (OutputStream) lz4BlockOutputStream + .getConstructor(OutputStream.class, Integer.TYPE, compressorClass) + .newInstance(buffer, 1 << 16, compressor); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } default: throw new IllegalArgumentException("Unknown compression type: " + type); } @@ -234,6 +257,17 @@ public class Compressor { } catch (Exception e) { throw new KafkaException(e); } + case LZ4: + case LZ4HC: + // dynamically load LZ4 class to avoid runtime dependency + try { + Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream"); + InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class) + .newInstance(buffer); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } default: throw new IllegalArgumentException("Unknown compression type: " + type); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index ce1177e..10df9fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -57,10 +57,10 @@ public final class Record { public static final byte CURRENT_MAGIC_VALUE = 0; /** - * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no + * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no * compression */ - public static final int COMPRESSION_CODEC_MASK = 0x03; + public static final int COMPRESSION_CODEC_MASK = 0x07; /** * Compression code for uncompressed records diff --git a/config/producer.properties b/config/producer.properties index 52a7611..39d65d7 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none , gzip, snappy. -# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. +# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally compression.codec=none # message encoder diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index 8762a79..de0a0fa 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -23,6 +23,8 @@ object CompressionCodec { case NoCompressionCodec.codec => NoCompressionCodec case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec + case LZ4CompressionCodec.codec => LZ4CompressionCodec + case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -31,6 +33,8 @@ object CompressionCodec { case NoCompressionCodec.name => NoCompressionCodec case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec + case LZ4CompressionCodec.name => LZ4CompressionCodec + case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -53,6 +57,16 @@ case object SnappyCompressionCodec extends CompressionCodec { val name = "snappy" } +case object LZ4CompressionCodec extends CompressionCodec { + val codec = 3 + val name = "lz4" +} + +case object LZ4HCCompressionCodec extends CompressionCodec { + val codec = 4 + val name = "lz4hc" +} + case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index ca833ee..8420e13 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -31,6 +31,12 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) + case LZ4CompressionCodec => + import net.jpountz.lz4.LZ4BlockOutputStream + new LZ4BlockOutputStream(stream) + case LZ4HCCompressionCodec => + import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} + new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -43,6 +49,9 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) + case LZ4CompressionCodec | LZ4HCCompressionCodec => + import net.jpountz.lz4.LZ4BlockInputStream + new LZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 52c082f..d2a7293 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -54,10 +54,10 @@ object Message { val CurrentMagicValue: Byte = 0 /** - * Specifies the mask for the compression code. 2 bits to hold the compression codec. + * Specifies the mask for the compression code. 3 bits to hold the compression codec. * 0 is reserved to indicate no compression */ - val CompressionCodeMask: Int = 0x03 + val CompressionCodeMask: Int = 0x07 /** * Compression code for uncompressed messages diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index ed22931..6f0addc 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -30,6 +30,10 @@ class MessageCompressionTest extends JUnitSuite { val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec) if(isSnappyAvailable) codecs += SnappyCompressionCodec + if(isLZ4Available) + codecs += LZ4CompressionCodec + if (izLZ4HCAvailable) + codecs += LZ4HCCompressionCodec for(codec <- codecs) testSimpleCompressDecompress(codec) } @@ -61,4 +65,23 @@ class MessageCompressionTest extends JUnitSuite { case e: org.xerial.snappy.SnappyError => false } } + + def isLZ4Available(): Boolean = { + try { + val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream()) + true + } catch { + case e: UnsatisfiedLinkError => false + } + } + + def izLZ4HCAvailable(): Boolean = { + try { + val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, + net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor()) + true + } catch { + case e: UnsatisfiedLinkError => false + } + } } diff --git a/system_test/producer_perf/bin/run-compression-test-all.sh b/system_test/producer_perf/bin/run-compression-test-all.sh new file mode 100755 index 0000000..3c57754 --- /dev/null +++ b/system_test/producer_perf/bin/run-compression-test-all.sh @@ -0,0 +1,60 @@ +#!/bin/bash -Eu +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +trap error HUP INT QUIT TERM ERR SIGINT SIGQUIT + +error() { + echo "Error: Test failed" + shutdown + exit 1 +} + +# Configuration +num_messages=200000 +message_size=200 +batch_size=200 +threads=1 +codecs=(0 1 2 3 4) +codecname=(none gzip snappy lz4 lz4hc) + +base_dir=$(dirname $0)/.. + +rm -rf /tmp/zookeeper +rm -rf /tmp/kafka-logs + +startup() { + echo "start the servers ..." + $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log & + $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log & + sleep 4 +} + +shutdown() { + echo "shutdown the servers ..." + ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null + sleep 2 + ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null +} + +startup +for codec in ${codecs[@]}; do + # Create the topic + topic=codec_${codecname[$codec]} + $base_dir/../../bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic $topic --replication-factor 1 --partitions 1 + + echo "producing $num_messages messages for codec '${codecname[$codec]}' ..." + $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --broker-list localhost:9092 --topics $topic --messages $num_messages --message-size $message_size --batch-size $batch_size --threads $threads --reporting-interval 100000 num_messages --compression-codec $codec +done +shutdown diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties index 9f8a633..83a1e06 100644 --- a/system_test/producer_perf/config/server.properties +++ b/system_test/producer_perf/config/server.properties @@ -60,10 +60,10 @@ enable.zookeeper=true # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 +zookeeper.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 # time based topic flush intervals in ms #log.flush.intervals.ms.per.topic=topic:1000