Kafka
  1. Kafka
  2. KAFKA-187

Add Snappy Compression as a Codec and refactor CompressionUtil and option on startup to select what the default codec

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      My thoughts are a new trait CompressionDependencies for KafkaProject.scala, adding snappy as the first library.

      refactor CompressionUtil for better code reuse and provide a way on startup to select what the default codec is instead of the default always gziping

      1. kafka-187.patch
        5 kB
        Joe Stein
      2. kafka-187.refactored.patch
        13 kB
        Joe Stein
      3. KAFKA-187_v3.patch
        14 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          Leaving a comment here, as Joe didn't appear to be in the JIRA list. Assigning this JIRA to Joe.

          Show
          Neha Narkhede added a comment - Leaving a comment here, as Joe didn't appear to be in the JIRA list. Assigning this JIRA to Joe.
          Hide
          Neha Narkhede added a comment -

          Thanks for the patch Joe! A couple of comments

          Show
          Neha Narkhede added a comment - Thanks for the patch Joe! A couple of comments
          Hide
          Neha Narkhede added a comment -

          1. There is quite a lot of overlap in the code for the GZIP and Snappy codec in CompressionUtils. Wonder if you were up for refactoring it so that they use the same code path ?
          2. One thing to think about is whether Snappy should be a compile and run time dependency in the core kafka project. Especially since, GZIP will be default and Snappy will only be used if it is explicitly configured. I wonder if there is any way of defining optional run time dependencies ?
          3. I think we will have to wait for the unit test to get fixed before accepting this patch. Have you tried running the system and performance tests yet ?

          Show
          Neha Narkhede added a comment - 1. There is quite a lot of overlap in the code for the GZIP and Snappy codec in CompressionUtils. Wonder if you were up for refactoring it so that they use the same code path ? 2. One thing to think about is whether Snappy should be a compile and run time dependency in the core kafka project. Especially since, GZIP will be default and Snappy will only be used if it is explicitly configured. I wonder if there is any way of defining optional run time dependencies ? 3. I think we will have to wait for the unit test to get fixed before accepting this patch. Have you tried running the system and performance tests yet ?
          Hide
          Joe Stein added a comment -

          1. There is quite a lot of overlap in the code for the GZIP and Snappy codec in CompressionUtils. Wonder if you were up for refactoring it so that they use the same code path ?

          Agreed, I am. Should I open a new ticket for refactoring CompressionUtils or just part of this ticket?

          2. One thing to think about is whether Snappy should be a compile and run time dependency in the core kafka project. Especially since, GZIP will be default and Snappy will only be used if it is explicitly configured. I wonder if there is any way of defining optional run time dependencies ?

          Yeah, we could so something at startup that changes the default behavior to be a specific codec. Incorporating this into refactoring that class should not be a big deal where the default case will check an object instead of implementing gzip (like it does now) and depending on that object call either the gzip or snappy function i create (which will also get used by the case match). same JIRA as this? a new JIRA for refactoring and put this into that? a third JIRA?

          3. I think we will have to wait for the unit test to get fixed before accepting this patch. Have you tried running the system and performance tests yet ?

          Sounds good, I should be able to chip away at that tomorrow or early this week. I did try running the performance tests but ran into some errors. it is possible something I was doing wrong so I want to go through and set it up again before sending email about that. same, will try that in the next few days too.

          Show
          Joe Stein added a comment - 1. There is quite a lot of overlap in the code for the GZIP and Snappy codec in CompressionUtils. Wonder if you were up for refactoring it so that they use the same code path ? Agreed, I am. Should I open a new ticket for refactoring CompressionUtils or just part of this ticket? 2. One thing to think about is whether Snappy should be a compile and run time dependency in the core kafka project. Especially since, GZIP will be default and Snappy will only be used if it is explicitly configured. I wonder if there is any way of defining optional run time dependencies ? Yeah, we could so something at startup that changes the default behavior to be a specific codec. Incorporating this into refactoring that class should not be a big deal where the default case will check an object instead of implementing gzip (like it does now) and depending on that object call either the gzip or snappy function i create (which will also get used by the case match). same JIRA as this? a new JIRA for refactoring and put this into that? a third JIRA? 3. I think we will have to wait for the unit test to get fixed before accepting this patch. Have you tried running the system and performance tests yet ? Sounds good, I should be able to chip away at that tomorrow or early this week. I did try running the performance tests but ran into some errors. it is possible something I was doing wrong so I want to go through and set it up again before sending email about that. same, will try that in the next few days too.
          Hide
          Neha Narkhede added a comment -

          1. I think we might as well do the refactoring it as part of this patch. Copy pasting code doesn't seem like a good idea.
          2. Yes. Same JIRA as this one. We might as well think through all the issues with supporting multiple codecs cleanly now, rather than later.
          3. I've updated the perf patch. Do you want to submit a patch for the unit test ?

          Show
          Neha Narkhede added a comment - 1. I think we might as well do the refactoring it as part of this patch. Copy pasting code doesn't seem like a good idea. 2. Yes. Same JIRA as this one. We might as well think through all the issues with supporting multiple codecs cleanly now, rather than later. 3. I've updated the perf patch. Do you want to submit a patch for the unit test ?
          Hide
          Joe Stein added a comment -

          1) sounds good to me, I opened another JIRA but will comment there that it is dead and will just do the work here, not a problem.
          2) sounds good
          3) done KAFKA-192

          the changes were simple enough for me to just redo them after refactoring as such

          Show
          Joe Stein added a comment - 1) sounds good to me, I opened another JIRA but will comment there that it is dead and will just do the work here, not a problem. 2) sounds good 3) done KAFKA-192 the changes were simple enough for me to just redo them after refactoring as such
          Hide
          Neha Narkhede added a comment -

          >> provide a way on startup to select what the default codec is instead of the default always gziping

          Today, we have a config named "compression.codec" that picks the compression codec. Today compression.codec is a numeric value. It is 0 for no compression and 1 for GZIP. Now we are supporting multiple codecs. It makes sense for this to be a string value, which can be one of "none, gzip, snappy". Default is still "none".

          As part of 2., I was raising a different question. In the general case, should snappy always be a core dependency of Kafka or not ? I don't know the right answer here. Maybe we need to think more.

          Show
          Neha Narkhede added a comment - >> provide a way on startup to select what the default codec is instead of the default always gziping Today, we have a config named "compression.codec" that picks the compression codec. Today compression.codec is a numeric value. It is 0 for no compression and 1 for GZIP. Now we are supporting multiple codecs. It makes sense for this to be a string value, which can be one of "none, gzip, snappy". Default is still "none". As part of 2., I was raising a different question. In the general case, should snappy always be a core dependency of Kafka or not ? I don't know the right answer here. Maybe we need to think more.
          Hide
          Joe Stein added a comment -

          re-factored CompressionUtil, added snappy compression and test case for its use. I have not run the perf test to compare gzip vs snappy yet.

          Show
          Joe Stein added a comment - re-factored CompressionUtil, added snappy compression and test case for its use. I have not run the perf test to compare gzip vs snappy yet.
          Hide
          Jun Rao added a comment -

          It would be good if the snappy jar is only a compile time dependency, but not a runtime dependency. This way, people not using snappy doesn't have to include the jar. Could we verify this?

          Show
          Jun Rao added a comment - It would be good if the snappy jar is only a compile time dependency, but not a runtime dependency. This way, people not using snappy doesn't have to include the jar. Could we verify this?
          Hide
          Joe Stein added a comment - - edited

          Yes, this patch allows for that. Here is how to verify.

          apply the patch
          ./sbt update
          ./sbt package

          then remove the jar

          rm -f core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar

          then launch up

          bin/zookeeper-server-start.sh config/zookeeper.properties
          bin/kafka-server-start.sh config/server.properties
          bin/kafka-producer-shell.sh --props config/producer.properties --topic test
          bin/kafka-consumer-shell.sh --topic test --props config/consumer.properties

          send messages, things are good

          shutdown down bin/kafka-producer-shell.sh --props config/producer.properties --topic test

          then in config/producer.properties change the codec to 1

          startup bin/kafka-producer-shell.sh --props config/producer.properties --topic test

          send messages, things are good to go

          shutdown bin/kafka-producer-shell.sh --props config/producer.properties --topic test

          then in config/producer.properties change the codec to 2

          startup bin/kafka-producer-shell.sh --props config/producer.properties --topic test

          starts up fine, then try to send a message.....

          Exception in thread "main" java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyOutputStream
          at kafka.message.SnappyCompression.<init>(CompressionUtils.scala:61)
          at kafka.message.CompressionFactory$.apply(CompressionUtils.scala:82)
          at kafka.message.CompressionUtils$.compress(CompressionUtils.scala:111)
          at kafka.message.MessageSet$.createByteBuffer(MessageSet.scala:71)
          at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:45)
          at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:108)
          at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:107)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
          at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:107)
          at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
          at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
          at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
          at kafka.producer.Producer.configSend(Producer.scala:167)
          at kafka.producer.Producer.send(Producer.scala:106)
          at kafka.tools.ProducerShell$.main(ProducerShell.scala:68)
          at kafka.tools.ProducerShell.main(ProducerShell.scala)
          Caused by: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream
          at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
          at java.security.AccessController.doPrivileged(Native Method)
          at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
          at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
          ... 23 more

          Show
          Joe Stein added a comment - - edited Yes, this patch allows for that. Here is how to verify. apply the patch ./sbt update ./sbt package then remove the jar rm -f core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar then launch up bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-producer-shell.sh --props config/producer.properties --topic test bin/kafka-consumer-shell.sh --topic test --props config/consumer.properties send messages, things are good shutdown down bin/kafka-producer-shell.sh --props config/producer.properties --topic test then in config/producer.properties change the codec to 1 startup bin/kafka-producer-shell.sh --props config/producer.properties --topic test send messages, things are good to go shutdown bin/kafka-producer-shell.sh --props config/producer.properties --topic test then in config/producer.properties change the codec to 2 startup bin/kafka-producer-shell.sh --props config/producer.properties --topic test starts up fine, then try to send a message..... Exception in thread "main" java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyOutputStream at kafka.message.SnappyCompression.<init>(CompressionUtils.scala:61) at kafka.message.CompressionFactory$.apply(CompressionUtils.scala:82) at kafka.message.CompressionUtils$.compress(CompressionUtils.scala:111) at kafka.message.MessageSet$.createByteBuffer(MessageSet.scala:71) at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:45) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:108) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:107) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:107) at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.ProducerPool.send(ProducerPool.scala:102) at kafka.producer.Producer.configSend(Producer.scala:167) at kafka.producer.Producer.send(Producer.scala:106) at kafka.tools.ProducerShell$.main(ProducerShell.scala:68) at kafka.tools.ProducerShell.main(ProducerShell.scala) Caused by: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:307) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:248) ... 23 more
          Hide
          Jun Rao added a comment -

          Thanks, Joe. That's what I was looking for.

          Show
          Jun Rao added a comment - Thanks, Joe. That's what I was looking for.
          Hide
          Jeffrey Damick added a comment -

          So, this raises a bigger question, how do clients signal that they can't handle codec 'xyz' ? Is it up to the consumer or could / should the broker re-encode it?
          Granted this probably isn't an issue for snappy since there are appear to be several implementations - but in general is there a need for an 'accept' style header?

          My thought is that if you leave it up to the client then you could run into an issue where you have client A and client B, and neither support the same compression codecs so you are stuck with uncompressed..

          Show
          Jeffrey Damick added a comment - So, this raises a bigger question, how do clients signal that they can't handle codec 'xyz' ? Is it up to the consumer or could / should the broker re-encode it? Granted this probably isn't an issue for snappy since there are appear to be several implementations - but in general is there a need for an 'accept' style header? My thought is that if you leave it up to the client then you could run into an issue where you have client A and client B, and neither support the same compression codecs so you are stuck with uncompressed..
          Hide
          Jun Rao added a comment -

          This is not a problem for java/scala clients. But it could be a problem for non-java clients. I think this is tied to some of the discussions that we had on non-java language support (see "different language binding support" thread in http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201109.mbox/thread). Ideally, we'd rather each language not re-implement a thick client.

          Show
          Jun Rao added a comment - This is not a problem for java/scala clients. But it could be a problem for non-java clients. I think this is tied to some of the discussions that we had on non-java language support (see "different language binding support" thread in http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201109.mbox/thread ). Ideally, we'd rather each language not re-implement a thick client.
          Hide
          Jeffrey Damick added a comment -

          Right I agree it overlaps that thread, but is compression support a thick or thin attribute? Maybe we should move this to the mailing list?

          Show
          Jeffrey Damick added a comment - Right I agree it overlaps that thread, but is compression support a thick or thin attribute? Maybe we should move this to the mailing list?
          Hide
          Jun Rao added a comment -

          Patch v2 looks good to me. Attaching Patch v3 with minor changes in CompressionCodec to avoid duplicating constants. If nobody objects, I will commit this patch later today.

          Show
          Jun Rao added a comment - Patch v2 looks good to me. Attaching Patch v3 with minor changes in CompressionCodec to avoid duplicating constants. If nobody objects, I will commit this patch later today.
          Hide
          Neha Narkhede added a comment -

          What is the difference between Joe's v2 patch and the v3 patch ? Can you please describe the changes when uploading a new patch ?

          Show
          Neha Narkhede added a comment - What is the difference between Joe's v2 patch and the v3 patch ? Can you please describe the changes when uploading a new patch ?
          Hide
          Jun Rao added a comment -

          The following are the changes that I made.

          Index: core/src/main/scala/kafka/message/CompressionCodec.scala
          ===================================================================
          — core/src/main/scala/kafka/message/CompressionCodec.scala (revision 1200967)
          +++ core/src/main/scala/kafka/message/CompressionCodec.scala (working copy)
          @@ -20,8 +20,9 @@
          object CompressionCodec {
          def getCompressionCodec(codec: Int): CompressionCodec = {
          codec match

          { - case 0 => NoCompressionCodec - case 1 => GZIPCompressionCodec + case NoCompressionCodec.codec => NoCompressionCodec + case GZIPCompressionCodec.codec => GZIPCompressionCodec + case SnappyCompressionCodec.codec => SnappyCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) }

          }

          Show
          Jun Rao added a comment - The following are the changes that I made. Index: core/src/main/scala/kafka/message/CompressionCodec.scala =================================================================== — core/src/main/scala/kafka/message/CompressionCodec.scala (revision 1200967) +++ core/src/main/scala/kafka/message/CompressionCodec.scala (working copy) @@ -20,8 +20,9 @@ object CompressionCodec { def getCompressionCodec(codec: Int): CompressionCodec = { codec match { - case 0 => NoCompressionCodec - case 1 => GZIPCompressionCodec + case NoCompressionCodec.codec => NoCompressionCodec + case GZIPCompressionCodec.codec => GZIPCompressionCodec + case SnappyCompressionCodec.codec => SnappyCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } }
          Hide
          Neha Narkhede added a comment -

          +1 on the latest patch. I like the changes.

          Show
          Neha Narkhede added a comment - +1 on the latest patch. I like the changes.
          Hide
          Jun Rao added a comment -

          Thanks, Joe. Just committed this.

          Show
          Jun Rao added a comment - Thanks, Joe. Just committed this.

            People

            • Assignee:
              Joe Stein
              Reporter:
              Joe Stein
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development