diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bc4074e..7457f09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -151,8 +151,8 @@ public class ProducerConfig extends AbstractConfig { /** compression.type */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, or snappy. Compression is of full batches of data, " - + " so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; + private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, lz4, or lz4hc. " + + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** metrics.sample.window.ms */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0fa6dd2..d608c63 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -219,9 +219,9 @@ public class Compressor { } case LZ4HC: try { - Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); - Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); - Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); + Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); + Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); + Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); Object factory = factoryClass.getMethod("fastestInstance").invoke(null); Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); OutputStream stream = (OutputStream) lz4BlockOutputStream diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index a2af988..65bfb4a 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -112,7 +112,7 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip', 'snappy', 'lz4', or 'lz4hc'." + "If specified without value, than it defaults to 'gzip'") .withOptionalArg() .describedAs("compression-codec") diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 17e2c6e..9825ccf 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -123,6 +123,8 @@ object ProducerCompressionTest { val list = new ArrayList[Array[String]]() list.add(Array("gzip")) list.add(Array("snappy")) + list.add(Array("lz4")) + list.add(Array("lz4hc")) list } }