Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8815

Kafka broker blocked on I/O primitive

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.1
    • Fix Version/s: None
    • Component/s: core, log
    • Labels:
      None

      Description

      This JIRA is for tracking a problem we run into on a production cluster.

      Scenario

      Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress of ~4 MB/s per broker.

      Brokers are running on OpenJDK 8. They are configured with a heap size of 1 GB.

      There is around ~1,000 partition replicas per broker. Load is evenly balanced. Each broker instance is under fair CPU load, but not overloaded (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, with mostly short young GC observed and an heap-after-GC usage of 70%.

      Replication factor is 3.

      Symptom

      One broker on the cluster suddenly became "unresponsive". Other brokers, Zookeeper and producers/consumers requests were failing with timeouts. The Kafka process, however, was still alive and doing some background work (truncating logs and rolling segments) This lasted for hours. At some point, several thread dumps were taken at few minutes interval. Most of the threads were "blocked". Deadlock was ruled out. The most suspicious stack is the following 

      Thread 7801: (state = BLOCKED)
       - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 (Compiled frame)
       - org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel) @bci=24, line=93 (Compiled frame)
       - org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords) @bci=5, line=152 (Compiled frame)
       - kafka.log.LogSegment.append(long, long, long, long, org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled frame)
       - kafka.log.Log.$anonfun$append$2(kafka.log.Log, org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, java.lang.Object) @bci=1080, line=757 (Compiled frame)
       - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
       - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) @bci=1, line=1696 (Compiled frame)
       - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int) @bci=29, line=642 (Compiled frame)
       - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, int, boolean) @bci=5, line=612 (Compiled frame)
       - kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition, org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, line=609 (Compiled frame)
       - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
       - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, scala.Function0) @bci=7, line=250 (Compiled frame)
       - kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, scala.Function0) @bci=8, line=256 (Compiled frame)
       - kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=16, line=597 (Compiled frame)
       - kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager, boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
       - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 (Compiled frame)
       - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 (Compiled frame)
       - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 (Compiled frame)
       - scala.collection.mutable.HashMap.$anonfun$foreach$1(scala.Function1, scala.collection.mutable.DefaultEntry) @bci=16, line=138 (Compiled frame)
       - scala.collection.mutable.HashMap$$Lambda$31.apply(java.lang.Object) @bci=8 (Compiled frame)
       - scala.collection.mutable.HashTable.foreachEntry(scala.Function1) @bci=39, line=236 (Compiled frame)
       - scala.collection.mutable.HashTable.foreachEntry$(scala.collection.mutable.HashTable, scala.Function1) @bci=2, line=229 (Compiled frame)
       - scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2, line=40 (Compiled frame)
       - scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=7, line=138 (Compiled frame)
       - scala.collection.TraversableLike.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=14, line=234 (Compiled frame)
       - scala.collection.TraversableLike.map$(scala.collection.TraversableLike, scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=227 (Compiled frame)
       - scala.collection.AbstractTraversable.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=104 (Compiled frame)
       - kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, scala.collection.Map, short) @bci=27, line=723 (Compiled frame)
       - kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, scala.collection.Map, scala.Function1, scala.Option, scala.Function1) @bci=27, line=465 (Compiled frame)
       - kafka.server.KafkaApis.handleProduceRequest(kafka.network.RequestChannel$Request) @bci=370, line=473 (Compiled frame)
       - kafka.server.KafkaApis.handle(kafka.network.RequestChannel$Request) @bci=30, line=104 (Compiled frame)
       - kafka.server.KafkaRequestHandler.run() @bci=140, line=69 (Compiled frame)
       - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
      

       The number of opened sockets and file descriptors had drastically increased on the broker instance.

      sh-4.2$ sudo lsof -P -p 7393 | grep REG | wc -l
      9681
      sh-4.2$ sudo lsof -P -p 7374 | grep ESTABLISHED | wc -l
      173390
      

      Compared to an healthy broker

      h-4.2$ sudo lsof -P -p 7393 | grep REG | wc -l
      9681
      sh-4.2$ sudo lsof -P -p 7393 | grep ESTABLISHED | wc -l
      402
      

      There was no error/warning in the broker's logs.

      I wonder if this type of behaviour has already been encountered - especially, blocking indefinitely on an I/O write.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              adupriez Alexandre Dupriez
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: