diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index bc4074e..7457f09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -151,8 +151,8 @@ public class ProducerConfig extends AbstractConfig {
/** compression.type */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
- private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, or snappy. Compression is of full batches of data, "
- + " so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
+ private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, snappy, lz4, or lz4hc. "
+ + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** metrics.sample.window.ms */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 0fa6dd2..d608c63 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -219,9 +219,9 @@ public class Compressor {
}
case LZ4HC:
try {
- Class> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
- Class> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
- Class> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
+ Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
+ Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
+ Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
OutputStream stream = (OutputStream) lz4BlockOutputStream
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index a2af988..65bfb4a 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -112,7 +112,7 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
- val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
+ val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
"If specified without value, than it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 17e2c6e..9825ccf 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -123,6 +123,8 @@ object ProducerCompressionTest {
val list = new ArrayList[Array[String]]()
list.add(Array("gzip"))
list.add(Array("snappy"))
+ list.add(Array("lz4"))
+ list.add(Array("lz4hc"))
list
}
}