diff --git config/producer.properties config/producer.properties index 75f1f2c..7d73133 100644 --- config/producer.properties +++ config/producer.properties @@ -37,8 +37,9 @@ broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: 0: no compression, 1: gzip -compression.codec=0 +# specify the compression codec for all data generated: none , gzip, snappy. +# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +compression.codec=none # message encoder serializer.class=kafka.serializer.StringEncoder diff --git core/src/main/scala/kafka/message/CompressionCodec.scala core/src/main/scala/kafka/message/CompressionCodec.scala index b71d4e9..8762a79 100644 --- core/src/main/scala/kafka/message/CompressionCodec.scala +++ core/src/main/scala/kafka/message/CompressionCodec.scala @@ -26,14 +26,34 @@ object CompressionCodec { case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } + def getCompressionCodec(name: String): CompressionCodec = { + name.toLowerCase match { + case NoCompressionCodec.name => NoCompressionCodec + case GZIPCompressionCodec.name => GZIPCompressionCodec + case SnappyCompressionCodec.name => SnappyCompressionCodec + case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) + } + } } -sealed trait CompressionCodec { def codec: Int } +sealed trait CompressionCodec { def codec: Int; def name: String } -case object DefaultCompressionCodec extends CompressionCodec { val codec = GZIPCompressionCodec.codec } +case object DefaultCompressionCodec extends CompressionCodec { + val codec = GZIPCompressionCodec.codec + val name = GZIPCompressionCodec.name +} -case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 } +case object GZIPCompressionCodec extends CompressionCodec { + val codec = 1 + val name = "gzip" +} -case object SnappyCompressionCodec extends CompressionCodec { val codec = 2 } +case object SnappyCompressionCodec extends CompressionCodec { + val codec = 2 + val name = "snappy" +} -case object NoCompressionCodec extends CompressionCodec { val codec = 0 } +case object NoCompressionCodec extends CompressionCodec { + val codec = 0 + val name = "none" +} diff --git core/src/main/scala/kafka/producer/ProducerConfig.scala core/src/main/scala/kafka/producer/ProducerConfig.scala index f974221..40ae709 100644 --- core/src/main/scala/kafka/producer/ProducerConfig.scala +++ core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -56,7 +56,15 @@ class ProducerConfig private (val props: VerifiableProperties) * This parameter allows you to specify the compression codec for all data generated * * by this producer. The default is NoCompressionCodec */ - val compressionCodec = CompressionCodec.getCompressionCodec(props.getInt("compression.codec", NoCompressionCodec.codec)) + + val compressionCodec : CompressionCodec = { + try { + CompressionCodec.getCompressionCodec(Integer.parseInt(props.getString("compression.codec", NoCompressionCodec.codec.toString))) + } catch { + case nfe: NumberFormatException => + CompressionCodec.getCompressionCodec(props.getString("compression.codec", NoCompressionCodec.name)) + } + } /** This parameter allows you to set whether compression should be turned * * on for particular topics