Kafka
  1. Kafka
  2. KAFKA-1456

Add LZ4 and LZ4C as a compression codec

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.2.0
    • Component/s: None
    • Labels:
    1. KAFKA-1456_2014-05-19_23:24:27.patch
      15 kB
      James Oliver
    2. KAFKA-1456_2014-05-19_18:19:32.patch
      15 kB
      James Oliver
    3. KAFKA-1456_2014-05-19_16:39:01.patch
      10 kB
      James Oliver
    4. KAFKA-1456_2014-05-19_15:01:10.patch
      16 kB
      James Oliver
    5. KAFKA-1456.patch
      17 kB
      James Oliver

      Activity

      Hide
      James Oliver added a comment -

      Created reviewboard https://reviews.apache.org/r/21663/diff/
      against branch origin/trunk

      Show
      James Oliver added a comment - Created reviewboard https://reviews.apache.org/r/21663/diff/ against branch origin/trunk
      Hide
      James Oliver added a comment -

      Updated reviewboard https://reviews.apache.org/r/21663/diff/
      against branch origin/trunk

      Show
      James Oliver added a comment - Updated reviewboard https://reviews.apache.org/r/21663/diff/ against branch origin/trunk
      Hide
      James Oliver added a comment -

      Updated reviewboard https://reviews.apache.org/r/21663/diff/
      against branch origin/trunk

      Show
      James Oliver added a comment - Updated reviewboard https://reviews.apache.org/r/21663/diff/ against branch origin/trunk
      Hide
      James Oliver added a comment -

      Updated reviewboard https://reviews.apache.org/r/21663/diff/
      against branch trunk

      Show
      James Oliver added a comment - Updated reviewboard https://reviews.apache.org/r/21663/diff/ against branch trunk
      Hide
      James Oliver added a comment -

      Updated reviewboard https://reviews.apache.org/r/21663/diff/
      against branch trunk

      Show
      James Oliver added a comment - Updated reviewboard https://reviews.apache.org/r/21663/diff/ against branch trunk
      Hide
      Joe Stein added a comment -

      committed to trunk

      Show
      Joe Stein added a comment - committed to trunk
      Hide
      Stephan Lachowsky added a comment - - edited

      Hello all,

      First off, I would really like to see this functionality get added, but I'd like to make sure the wire protocol is done properly, before it is picked by a release and there is no going back.

      Here are, in my estimation, the issues with the current implementation:

      • LZ4 and LZ4HC generate the same output format, so they shouldn't have different compression codec enums... this is a producer configuration issue only. The same caveat applies to the compressor "level" parameter, it is basically a producer CPU/compression tradeoff.
      • The LZ4Block format used by net.jpountz.lz4.LZ4BlockInputStream and net.jpountz.lz4.LZ4BlockOutputStream is a block based format that isn't well documented outside of the java code. I would recommend that something documented be used, like the format defined by the LZ4 author: http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html
      Show
      Stephan Lachowsky added a comment - - edited Hello all, First off, I would really like to see this functionality get added, but I'd like to make sure the wire protocol is done properly, before it is picked by a release and there is no going back. Here are, in my estimation, the issues with the current implementation: LZ4 and LZ4HC generate the same output format, so they shouldn't have different compression codec enums... this is a producer configuration issue only. The same caveat applies to the compressor "level" parameter, it is basically a producer CPU/compression tradeoff. The LZ4Block format used by net.jpountz.lz4.LZ4BlockInputStream and net.jpountz.lz4.LZ4BlockOutputStream is a block based format that isn't well documented outside of the java code. I would recommend that something documented be used, like the format defined by the LZ4 author: http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html
      Hide
      Joe Stein added a comment -

      Looks like a good point. I have been looking through some other codecs also we should add another few and update this also appropriately.

      James Oliver Can you work on that?

      Show
      Joe Stein added a comment - Looks like a good point. I have been looking through some other codecs also we should add another few and update this also appropriately. James Oliver Can you work on that?
      Hide
      Jay Kreps added a comment -

      Guys, does this add a runtime dependency on this jar for all clients?

      Show
      Jay Kreps added a comment - Guys, does this add a runtime dependency on this jar for all clients?
      Hide
      Joe Stein added a comment -

      it shouldn't since the imports are all within the case class like snappy is

      Show
      Joe Stein added a comment - it shouldn't since the imports are all within the case class like snappy is
      Hide
      James Oliver added a comment -

      The Java client uses reflection to invoke the codec so there's no runtime dependency, and the Scala code only imports the codec inside of a case clause. We should be OK on that note.

      Stephan Lachowsky Yep, you're totally correct regarding the enum...there should only be 1 with the capability of passing custom configuration.

      Does a Java implementation for the well-documented streaming format exist? Or would we need to implement it ourselves?

      Show
      James Oliver added a comment - The Java client uses reflection to invoke the codec so there's no runtime dependency, and the Scala code only imports the codec inside of a case clause. We should be OK on that note. Stephan Lachowsky Yep, you're totally correct regarding the enum...there should only be 1 with the capability of passing custom configuration. Does a Java implementation for the well-documented streaming format exist? Or would we need to implement it ourselves?
      Hide
      Steven Schlansker added a comment -

      lz4-java tracks the lack of the common streaming format too, as:
      https://github.com/jpountz/lz4-java/issues/21

      If you end up writing code for this it would be very nice to contribute it back to lz4-java.

      Show
      Steven Schlansker added a comment - lz4-java tracks the lack of the common streaming format too, as: https://github.com/jpountz/lz4-java/issues/21 If you end up writing code for this it would be very nice to contribute it back to lz4-java.
      Hide
      Joel Koshy added a comment -

      Actually I think there is a run-time dependency.

      • rm -f ./core/build/dependant-libs-2.8.0/lz4-1.2.0.jars
      • Start your kafka broker and zookeeper
      • ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --compression-codec gzip
        [2014-06-04 10:41:27,722] ERROR Failed to send messages (kafka.producer.async.DefaultEventHandler)
        java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4BlockInputStream
                at kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$create(ByteBufferMessageSet.scala:41)
                at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
                at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandler.scala:313)
                at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandler.scala:301)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.Iterator$class.foreach(Iterator.scala:631)
                at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
                at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
                at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
                at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
                at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
                at scala.collection.mutable.HashMap.map(HashMap.scala:39)
                at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$groupMessagesToSet(DefaultEventHandler.scala:301)
                at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:104)
                at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.Iterator$class.foreach(Iterator.scala:631)
                at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
                at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
                at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
                at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
                at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
                at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
                at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
                at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
                at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
                at scala.collection.immutable.Stream.foreach(Stream.scala:254)
                at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
                at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
        Caused by: java.lang.ClassNotFoundException: net.jpountz.lz4.LZ4BlockInputStream
                at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
                at java.security.AccessController.doPrivileged(Native Method)
                at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
                ... 33 more
        

      However, this applies to snappy as well.

      Show
      Joel Koshy added a comment - Actually I think there is a run-time dependency. rm -f ./core/build/dependant-libs-2.8.0/lz4-1.2.0.jars Start your kafka broker and zookeeper ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --compression-codec gzip [2014-06-04 10:41:27,722] ERROR Failed to send messages (kafka.producer.async.DefaultEventHandler) java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4BlockInputStream at kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$create(ByteBufferMessageSet.scala:41) at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102) at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandler.scala:313) at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandler.scala:301) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.HashMap.map(HashMap.scala:39) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$groupMessagesToSet(DefaultEventHandler.scala:301) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:104) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) Caused by: java.lang.ClassNotFoundException: net.jpountz.lz4.LZ4BlockInputStream at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang. ClassLoader .loadClass( ClassLoader .java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang. ClassLoader .loadClass( ClassLoader .java:247) ... 33 more However, this applies to snappy as well.
      Hide
      Albert Strasheim added a comment -

      Hi all. Did Stephan's concerns get addressed? Pretty please (with sugar on top) don't leave the LZ4 compression format as it stands in github.com/jpountz/lz4-java right now.

      Show
      Albert Strasheim added a comment - Hi all. Did Stephan's concerns get addressed? Pretty please (with sugar on top) don't leave the LZ4 compression format as it stands in github.com/jpountz/lz4-java right now.
      Hide
      Joe Stein added a comment -

      I believe they are being addressed in another ticket as this one already had the patch committed to trunk by the time he mentioned them (which is why I resolved it) James Oliver are you going to be able to handle them? Did you already create another ticket for it if not please do (link to this one). It would be great to have this all settled for the 0.8.2 release which I think is feasible.

      Show
      Joe Stein added a comment - I believe they are being addressed in another ticket as this one already had the patch committed to trunk by the time he mentioned them (which is why I resolved it) James Oliver are you going to be able to handle them? Did you already create another ticket for it if not please do (link to this one). It would be great to have this all settled for the 0.8.2 release which I think is feasible.
      Hide
      James Oliver added a comment -

      Sure, I'll open another ticket to address the outstanding issues this afternoon.

      Joe Stein Did the KAFKA-1471 patch get committed to trunk? If not I can roll those changes into the next ticket as well.

      Show
      James Oliver added a comment - Sure, I'll open another ticket to address the outstanding issues this afternoon. Joe Stein Did the KAFKA-1471 patch get committed to trunk? If not I can roll those changes into the next ticket as well.
      Hide
      James Oliver added a comment -

      Created KAFKA-1493

      Show
      James Oliver added a comment - Created KAFKA-1493

        People

        • Assignee:
          Unassigned
          Reporter:
          Joe Stein
        • Votes:
          3 Vote for this issue
          Watchers:
          9 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development