Kafka broker blocked on I/O primitive

      This JIRA is for tracking a problem we run into on a production cluster.


      Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress of ~4 MB/s per broker.

      Brokers are running on OpenJDK 8. They are configured with a heap size of 1 GB.

      There is around ~1,000 partition replicas per broker. Load is evenly balanced. Each broker instance is under fair CPU load, but not overloaded (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, with mostly short young GC observed and an heap-after-GC usage of 70%.

      Replication factor is 3.


      One broker on the cluster suddenly became "unresponsive". Other brokers, Zookeeper and producers/consumers requests were failing with timeouts. The Kafka process, however, was still alive and doing some background work (truncating logs and rolling segments) This lasted for hours. At some point, several thread dumps were taken at few minutes interval. Most of the threads were "blocked". Deadlock was ruled out. The most suspicious stack is the following 

      Thread 7801: (state = BLOCKED)
       - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 (Compiled frame)
       - org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel) @bci=24, line=93 (Compiled frame)
       - org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords) @bci=5, line=152 (Compiled frame)
       - kafka.log.LogSegment.append(long, long, long, long, org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled frame)
       - kafka.log.Log.$anonfun$append$2(kafka.log.Log, org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, java.lang.Object) @bci=1080, line=757 (Compiled frame)
       - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
       - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) @bci=1, line=1696 (Compiled frame)
       - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int) @bci=29, line=642 (Compiled frame)
       - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, int, boolean) @bci=5, line=612 (Compiled frame)
       - kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition, org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, line=609 (Compiled frame)
       - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
       - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, scala.Function0) @bci=7, line=250 (Compiled frame)
       - kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, scala.Function0) @bci=8, line=256 (Compiled frame)
       - kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=16, line=597 (Compiled frame)
       - kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager, boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
       - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 (Compiled frame)
       - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 (Compiled frame)
       - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 (Compiled frame)
       - scala.collection.mutable.HashMap.$anonfun$foreach$1(scala.Function1, scala.collection.mutable.DefaultEntry) @bci=16, line=138 (Compiled frame)
       - scala.collection.mutable.HashMap$$Lambda$31.apply(java.lang.Object) @bci=8 (Compiled frame)
       - scala.collection.mutable.HashTable.foreachEntry(scala.Function1) @bci=39, line=236 (Compiled frame)
       - scala.collection.mutable.HashTable.foreachEntry$(scala.collection.mutable.HashTable, scala.Function1) @bci=2, line=229 (Compiled frame)
       - scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2, line=40 (Compiled frame)
       - scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=7, line=138 (Compiled frame)
       - scala.collection.TraversableLike.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=14, line=234 (Compiled frame)
       - scala.collection.TraversableLike.map$(scala.collection.TraversableLike, scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=227 (Compiled frame)
       - scala.collection.AbstractTraversable.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=104 (Compiled frame)
       - kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, scala.collection.Map, short) @bci=27, line=723 (Compiled frame)
       - kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, scala.collection.Map, scala.Function1, scala.Option, scala.Function1) @bci=27, line=465 (Compiled frame)
       - kafka.server.KafkaApis.handleProduceRequest(kafka.network.RequestChannel$Request) @bci=370, line=473 (Compiled frame)
       - kafka.server.KafkaApis.handle(kafka.network.RequestChannel$Request) @bci=30, line=104 (Compiled frame)
       - kafka.server.KafkaRequestHandler.run() @bci=140, line=69 (Compiled frame)
       - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)

       The number of opened sockets and file descriptors had drastically increased on the broker instance.

      sh-4.2$ sudo lsof -P -p 7393 | grep REG | wc -l
      sh-4.2$ sudo lsof -P -p 7374 | grep ESTABLISHED | wc -l

      Compared to an healthy broker

      h-4.2$ sudo lsof -P -p 7393 | grep REG | wc -l
      sh-4.2$ sudo lsof -P -p 7393 | grep ESTABLISHED | wc -l

      There was no error/warning in the broker's logs.

      I wonder if this type of behaviour has already been encountered - especially, blocking indefinitely on an I/O write.




