Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3240

Replication issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.8.2.2, 0.9.0.0, 0.9.0.1
    • None
    • core
    • FreeBSD 10.2-RELEASE-p9

    Description

      Hi,

      We are trying to replace our 3-broker cluster running on 0.6 with a new cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well).

      • 3 kafka nodes with one zookeeper instance on each machine
      • FreeBSD 10.2 p9
      • Nagle off (sysctl net.inet.tcp.delayed_ack=0)
      • all kafka machines write a ZFS ZIL to a dedicated SSD
      • 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication factor 3
      • acks all
      • 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case.

      While using the ProducerPerformance or rdkafka_performance we are seeing very strange Replication errors. Any hint on what's going on would be highly appreciated. Any suggestion on how to debug this properly would help as well.

      This is what our broker config looks like:

      broker.id=5
      auto.create.topics.enable=false
      delete.topic.enable=true
      listeners=PLAINTEXT://:9092
      port=9092
      host.name=kafka-five.acc
      advertised.host.name=10.5.3.18
      zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181
      zookeeper.connection.timeout.ms=6000
      num.replica.fetchers=1
      replica.fetch.max.bytes=100000000
      replica.fetch.wait.max.ms=500
      replica.high.watermark.checkpoint.interval.ms=5000
      replica.socket.timeout.ms=300000
      replica.socket.receive.buffer.bytes=65536
      replica.lag.time.max.ms=1000
      min.insync.replicas=2
      controller.socket.timeout.ms=30000
      controller.message.queue.size=100
      log.dirs=/var/db/kafka
      num.partitions=8
      message.max.bytes=100000000
      auto.create.topics.enable=false
      log.index.interval.bytes=4096
      log.index.size.max.bytes=10485760
      log.retention.hours=168
      log.flush.interval.ms=10000
      log.flush.interval.messages=20000
      log.flush.scheduler.interval.ms=2000
      log.roll.hours=168
      log.retention.check.interval.ms=300000
      log.segment.bytes=536870912
      zookeeper.connection.timeout.ms=1000000
      zookeeper.sync.time.ms=5000
      num.io.threads=8
      num.network.threads=4
      socket.request.max.bytes=104857600
      socket.receive.buffer.bytes=1048576
      socket.send.buffer.bytes=1048576
      queued.max.requests=100000
      fetch.purgatory.purge.interval.requests=100
      producer.purgatory.purge.interval.requests=100
      replica.lag.max.messages=10000000
      

      These are the errors we're seeing:

      ERROR [Replica Manager on Broker 5]: Error processing fetch operation on partition [test,0] offset 50727 (kafka.server.ReplicaManager)
      java.lang.IllegalStateException: Invalid message size: 0
      	at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141)
      	at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
      	at kafka.log.LogSegment.read(LogSegment.scala:126)
      	at kafka.log.Log.read(Log.scala:506)
      	at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
      	at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
      	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
      	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
      	at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
      	at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
      	at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
      	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
      	at java.lang.Thread.run(Thread.java:745)0
      
      

      and

      ERROR Found invalid messages during fetch for partition [test,0] offset 2732 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            jj83 Jan Omar
            Votes:
            3 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated: