diff --git a/build.gradle b/build.gradle index b3bbd77..2f4167f 100644 --- a/build.gradle +++ b/build.gradle @@ -365,6 +365,7 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" compile 'org.xerial.snappy:snappy-java:1.0.5' + compile 'net.jpountz.lz4:lz4:1.2.0' testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index c557e44..5227b2d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); public final int id; public final String name; @@ -40,6 +40,10 @@ public enum CompressionType { return GZIP; case 2: return SNAPPY; + case 3: + return LZ4; + case 4: + return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -52,6 +56,10 @@ public enum CompressionType { return GZIP; else if (SNAPPY.name.equals(name)) return SNAPPY; + else if (LZ4.name.equals(name)) + return LZ4; + else if (LZ4HC.name.equals(name)) + return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index ce1177e..10df9fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -57,10 +57,10 @@ public final class Record { public static final byte CURRENT_MAGIC_VALUE = 0; /** - * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no + * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no * compression */ - public static final int COMPRESSION_CODEC_MASK = 0x03; + public static final int COMPRESSION_CODEC_MASK = 0x07; /** * Compression code for uncompressed records diff --git a/config/producer.properties b/config/producer.properties index 52a7611..39d65d7 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none , gzip, snappy. -# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. +# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally compression.codec=none # message encoder diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index 8762a79..9363a42 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -23,6 +23,8 @@ object CompressionCodec { case NoCompressionCodec.codec => NoCompressionCodec case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec + case LZ4CompressionCodec.codec => LZ4CompressionCodec + case LZ4HCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -31,6 +33,8 @@ object CompressionCodec { case NoCompressionCodec.name => NoCompressionCodec case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec + case LZ4CompressionCodec.name => LZ4CompressionCodec + case LZ4HCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -53,6 +57,16 @@ case object SnappyCompressionCodec extends CompressionCodec { val name = "snappy" } +case object LZ4CompressionCodec extends CompressionCodec { + val codec = 3 + val name = "lz4" +} + +case object LZ4HCCompressionCodec extends CompressionCodec { + val codec = 4 + val name = "lz4hc" +} + case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index ca833ee..8420e13 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -31,6 +31,12 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) + case LZ4CompressionCodec => + import net.jpountz.lz4.LZ4BlockOutputStream + new LZ4BlockOutputStream(stream) + case LZ4HCCompressionCodec => + import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} + new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -43,6 +49,9 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) + case LZ4CompressionCodec | LZ4HCCompressionCodec => + import net.jpountz.lz4.LZ4BlockInputStream + new LZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 52c082f..d2a7293 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -54,10 +54,10 @@ object Message { val CurrentMagicValue: Byte = 0 /** - * Specifies the mask for the compression code. 2 bits to hold the compression codec. + * Specifies the mask for the compression code. 3 bits to hold the compression codec. * 0 is reserved to indicate no compression */ - val CompressionCodeMask: Int = 0x03 + val CompressionCodeMask: Int = 0x07 /** * Compression code for uncompressed messages diff --git a/system_test/producer_perf/bin/run-compression-test-all.sh b/system_test/producer_perf/bin/run-compression-test-all.sh new file mode 100755 index 0000000..227ee6a --- /dev/null +++ b/system_test/producer_perf/bin/run-compression-test-all.sh @@ -0,0 +1,60 @@ +#!/bin/bash -Eu +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +trap error HUP INT QUIT TERM ERR SIGINT SIGQUIT + +error() { + echo "Error: Test failed" + shutdown + exit 1 +} + +# Configuration +num_messages=200000 +message_size=200 +batch_size=200 +threads=1 +codecs=(0 1 2 3 4) +codecname=(none gzip snappy lz4 lz4hc) + +base_dir=$(dirname $0)/.. + +rm -rf /tmp/zookeeper +rm -rf /tmp/kafka-logs + +startup() { + echo "start the servers ..." + $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/../../logs/zookeeper.log & + $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/../../logs/kafka.log & + sleep 4 +} + +shutdown() { + echo "shutdown the servers ..." + ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null + sleep 2 + ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null +} + +startup +for codec in ${codecs[@]}; do + # Create the topic + topic=codec_${codecname[$codec]} + $base_dir/../../bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic $topic --replication-factor 1 --partitions 1 + + echo "producing $num_messages messages for codec '${codecname[$codec]}' ..." + $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --broker-list localhost:9092 --topics $topic --messages $num_messages --message-size $message_size --batch-size $batch_size --threads $threads --reporting-interval 100000 num_messages --compression-codec $codec +done +shutdown \ No newline at end of file diff --git a/system_test/producer_perf/bin/run-test.sh b/system_test/producer_perf/bin/run-test.sh index bb60817..016a668 100755 --- a/system_test/producer_perf/bin/run-test.sh +++ b/system_test/producer_perf/bin/run-test.sh @@ -23,8 +23,8 @@ rm -rf /tmp/zookeeper rm -rf /tmp/kafka-logs echo "start the servers ..." -$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log & -$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log & +$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/../../logs/zookeeper.log & +$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/../../logs/kafka.log & sleep 4 echo "start producing $num_messages messages ..." diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties index 9f8a633..c9e923a 100644 --- a/system_test/producer_perf/config/server.properties +++ b/system_test/producer_perf/config/server.properties @@ -14,65 +14,104 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults -# the id of the broker -broker.id=0 +############################# Server Basics ############################# -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# host.name= +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 -# number of logical partitions on this broker -num.partitions=1 +############################# Socket Server Settings ############################# -# the port the socket server runs on +# The port the socket server listens on port=9092 -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= -# the directory in which to store log files -log.dir=/tmp/kafka-logs +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 -# the send buffer used by the socket server +# The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 -# the receive buffer used by the socket server +# The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 -# the maximum size of a log segment -log.segment.bytes=536870912 +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 -#the number of messages to accept without flushing the log to disk -log.flush.interval.messages=600 +############################# Log Flush Policy ############################# -#set the following properties to use zookeeper +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. -# enable connecting to zookeeper -enable.zookeeper=true +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 -# timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 -# time based topic flush intervals in ms -#log.flush.intervals.ms.per.topic=topic:1000 +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false -# default time based flush interval in ms -log.flush.interval.ms=1000 +############################# Zookeeper ############################# -# time based topic flasher time rate in ms -log.flush.scheduler.interval.ms=1000 +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000