Kafka
  1. Kafka
  2. KAFKA-411

Message Error in high cocurrent environment

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Cannot Reproduce
    • Affects Version/s: 0.7
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      In high cocurrent environment, these errors always appera in kafka broker:

      ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
      kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
      at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
      at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at kafka.message.MessageSet.foreach(MessageSet.scala:87)
      at kafka.log.Log.append(Log.scala:205)
      at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
      at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
      at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
      at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
      at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
      at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
      at kafka.network.Processor.handle(SocketServer.scala:296)
      at kafka.network.Processor.read(SocketServer.scala:319)
      at kafka.network.Processor.run(SocketServer.scala:214)
      at java.lang.Thread.run(Thread.java:722)

      ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
      kafka.common.InvalidTopicException: topic name can't be empty
      at kafka.log.LogManager.getLogPool(LogManager.scala:159)
      at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

        Activity

        Hide
        jian fan added a comment - - edited

        In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log directory should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in file.mkdir() of LogManager.createlog.

        Show
        jian fan added a comment - - edited In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log directory should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in file.mkdir() of LogManager.createlog.
        Hide
        jian fan added a comment -

        the error is

        [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0 (kafka.log.LogManager)
        [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on axx:0 (kafka.server.KafkaRequestHandlers)
        java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka (Is a directory)
        at java.io.RandomAccessFile.open(Native Method)
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
        at kafka.utils.Utils$.openChannel(Utils.scala:324)
        at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
        at kafka.log.Log.loadSegments(Log.scala:144)
        at kafka.log.Log.<init>(Log.scala:116)
        at kafka.log.LogManager.createLog(LogManager.scala:159)
        at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
        at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
        at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
        at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
        at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
        at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
        at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
        at kafka.network.Processor.handle(SocketServer.scala:296)
        at kafka.network.Processor.read(SocketServer.scala:319)
        at kafka.network.Processor.run(SocketServer.scala:214)
        at java.lang.Thread.run(Thread.java:679)

        Show
        jian fan added a comment - the error is [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0 (kafka.log.LogManager) [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on axx:0 (kafka.server.KafkaRequestHandlers) java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka (Is a directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) at kafka.utils.Utils$.openChannel(Utils.scala:324) at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) at kafka.log.Log.loadSegments(Log.scala:144) at kafka.log.Log.<init>(Log.scala:116) at kafka.log.LogManager.createLog(LogManager.scala:159) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679)
        Hide
        jian fan added a comment -

        In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0).

        Show
        jian fan added a comment - In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0).
        Hide
        jian fan added a comment -

        In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0)

        Show
        jian fan added a comment - In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0)
        Hide
        Jun Rao added a comment -

        Thanks for the patch. It may not be the right fix though since it fixes the symptom, but not the cause. For each produce request, the broker does the following: (1) read all bytes of the request into a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the request are ready, deserialize the bytes into a ProducerRequest (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request by adding topic data to logs.

        What you observed is that in step 3, a topic name is corrupted somehow. However, this means that the corresponding ProducerRequest is corrupted. Assuming there is no corruption at the network layer (very unlikely), the corruption much have happened in step 1 or step 2. So, instead of patching a corrupted topic name, we should understand why a ProducerRequest can be corrupted and fix the cause. BTW, what's caused the corrupted topic could be causing the corrupted messages too.

        Show
        Jun Rao added a comment - Thanks for the patch. It may not be the right fix though since it fixes the symptom, but not the cause. For each produce request, the broker does the following: (1) read all bytes of the request into a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the request are ready, deserialize the bytes into a ProducerRequest (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request by adding topic data to logs. What you observed is that in step 3, a topic name is corrupted somehow. However, this means that the corresponding ProducerRequest is corrupted. Assuming there is no corruption at the network layer (very unlikely), the corruption much have happened in step 1 or step 2. So, instead of patching a corrupted topic name, we should understand why a ProducerRequest can be corrupted and fix the cause. BTW, what's caused the corrupted topic could be causing the corrupted messages too.
        Hide
        jian fan added a comment -

        I have locate the problem. It was cause by cisio router. In high load scenario, our cisio router(2960s) will drop some packages by its low ability. So socket.recv() should be fine, we just need to solve the log directory corrupted by topic name with null byte in this scenario.

        Show
        jian fan added a comment - I have locate the problem. It was cause by cisio router. In high load scenario, our cisio router(2960s) will drop some packages by its low ability. So socket.recv() should be fine, we just need to solve the log directory corrupted by topic name with null byte in this scenario.
        Hide
        Jun Rao added a comment -

        Not sure if this is still an issue in 0.8. Move it to 0.8.1 for now.

        Show
        Jun Rao added a comment - Not sure if this is still an issue in 0.8. Move it to 0.8.1 for now.
        Hide
        Guozhang Wang added a comment -

        Hello jian fan, would you try current trunk HEAD to see if this is still a problem for you?

        Show
        Guozhang Wang added a comment - Hello jian fan , would you try current trunk HEAD to see if this is still a problem for you?
        Hide
        Neha Narkhede added a comment -

        Not sure if this is a problem at all

        Show
        Neha Narkhede added a comment - Not sure if this is a problem at all

          People

          • Assignee:
            Unassigned
            Reporter:
            jian fan
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development