Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7697

Possible deadlock in kafka.cluster.Partition

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0, 2.1.1
    • None
    • None

    Description

      After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up within a few minutes (by "locked up" I mean that all request handler threads were busy, and other brokers reported that they couldn't communicate with it). I restarted it a few times and it did the same thing each time. After downgrading to 0.10.2.0, the broker was stable. I attached a threaddump.txt from the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.

      It jumps out that there are two threads that already have some read lock (can't tell which one) and are trying to acquire a second one (on two different read locks: 0x0000000708184b88 and 0x000000070821f188): kafka-request-handler-1 and kafka-request-handler-4. Both are handling a produce request, and in the process of doing so, are calling Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the same time, both of those locks have writers from other threads waiting on them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks appear to have writers that hold them (if only because no threads in the dump are deep enough in inWriteLock to indicate that).

      ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over readers. Is it possible that kafka-request-handler-1 and kafka-request-handler-4 are each trying to read-lock the partition that is currently locked by the other one, and they're both parked waiting for kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they never will, because the former two threads own read locks and aren't giving them up?

      Attachments

        1. 2.1.1-hangs.log
          91 kB
          Muchl
        2. 322.tdump
          89 kB
          yanrui
        3. kafka_jstack.txt
          256 kB
          yanrui
        4. kafka.log
          108 kB
          Jackson Westeen
        5. threaddump.txt
          78 kB
          Gian Merlino

        Issue Links

          Activity

            ijuma Ismael Juma added a comment -

            Marking as blocker until we understand the details.

            ijuma Ismael Juma added a comment - Marking as blocker until we understand the details.
            rsivaram Rajini Sivaram added a comment -

            Changes made under KAFKA-7395 now protect fetch using the Partition's leaderIsrUpdateLock. This results in the read lock being acquired while completing a delayed fetch. This is unsafe since delayed operations can be completed while holding onto another Partition lock. For example the thread dump for request-handler-4 shows:

            at sun.misc.Unsafe.park(Native Method)

            • parking to wait for <0x000000070821f188> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
              at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:832)
              at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:87)
              at kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79)
              at kafka.server.DelayedFetch$$Lambda$912/582152661.apply(Unknown Source)
              at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
              at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79)
              at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
              at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
              at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
              at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:307)
              at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:743)
              at kafka.cluster.Partition$$Lambda$917/80048373.apply(Unknown Source)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
              at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
              at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
              at kafka.server.ReplicaManager$$Lambda$915/220982367.apply(Unknown Source)
              at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
              at scala.collection.TraversableLike$$Lambda$12/1209669119.apply(Unknown Source)
              at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:145)
              at scala.collection.mutable.HashMap$$Lambda$24/477289012.apply(Unknown Source)
              at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:235)
              at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:228)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:145)
              at scala.collection.TraversableLike.map(TraversableLike.scala:233)
              at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
              at scala.collection.AbstractTraversable.map(Traversable.scala:104)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
              at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
              at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
              at java.lang.Thread.run(Thread.java:748)

            A whole bunch of threads including all request handler threads seem to be deadlocked as a result of leaderIsrUpdateLock of two partitions that are blocked while completing delayed fetch as a result of waiting writers.

            For purgatory operations that acquire a lock, we use that lock as the delayed operation lock, but that is not an option here since fetch could contain multiple partitions. So we need some other way to avoid blocking for a Partition lock while holding onto another Partition lock.

            rsivaram Rajini Sivaram added a comment - Changes made under KAFKA-7395 now protect fetch using the Partition's leaderIsrUpdateLock . This results in the read lock being acquired while completing a delayed fetch. This is unsafe since delayed operations can be completed while holding onto another Partition lock. For example the thread dump for request-handler-4 shows: at sun.misc.Unsafe.park(Native Method) parking to wait for <0x000000070821f188> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283) at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:832) at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:87) at kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79) at kafka.server.DelayedFetch$$Lambda$912/582152661.apply(Unknown Source) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79) at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277) at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:307) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:743) at kafka.cluster.Partition$$Lambda$917/80048373.apply(Unknown Source) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735) at kafka.server.ReplicaManager$$Lambda$915/220982367.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.TraversableLike$$Lambda$12/1209669119.apply(Unknown Source) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:145) at scala.collection.mutable.HashMap$$Lambda$24/477289012.apply(Unknown Source) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:235) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:228) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:145) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482) at kafka.server.KafkaApis.handle(KafkaApis.scala:106) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) A whole bunch of threads including all request handler threads seem to be deadlocked as a result of leaderIsrUpdateLock of two partitions that are blocked while completing delayed fetch as a result of waiting writers. For purgatory operations that acquire a lock, we use that lock as the delayed operation lock, but that is not an option here since fetch could contain multiple partitions. So we need some other way to avoid blocking for a Partition lock while holding onto another Partition lock.
            githubbot ASF GitHub Bot added a comment -

            rajinisivaram opened a new pull request #5997: KAFKA-7697: Avoid blocking for leaderIsrUpdateLock in DelayedFetch
            URL: https://github.com/apache/kafka/pull/5997

                1. Committer Checklist (excluded from commit message)
            • [ ] Verify design and implementation
            • [ ] Verify test coverage and CI build status
            • [ ] Verify documentation (including upgrade notes)

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - rajinisivaram opened a new pull request #5997: KAFKA-7697 : Avoid blocking for leaderIsrUpdateLock in DelayedFetch URL: https://github.com/apache/kafka/pull/5997 Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            githubbot ASF GitHub Bot added a comment -

            rajinisivaram closed pull request #5997: KAFKA-7697: Avoid blocking for leaderIsrUpdateLock in DelayedFetch
            URL: https://github.com/apache/kafka/pull/5997

            This is a PR merged from a forked repository.
            As GitHub hides the original diff on merge, it is displayed below for
            the sake of provenance:

            As this is a foreign pull request (from a fork), the diff is supplied
            below (as it won't show otherwise due to GitHub magic):

            diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
            index 745c89a393b..a5655c77e2d 100755
            — a/core/src/main/scala/kafka/cluster/Partition.scala
            +++ b/core/src/main/scala/kafka/cluster/Partition.scala
            @@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition,
            localReplica.offsetSnapshot
            }

            + def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
            + fetchOnlyFromLeader: Boolean): Option[LogOffsetSnapshot] = {
            + if (leaderIsrUpdateLock.readLock().tryLock()) {
            + try

            { + // decide whether to only fetch from leader + val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + Some(localReplica.offsetSnapshot) + }

            finally

            { + leaderIsrUpdateLock.readLock().unlock() + }

            + } else
            + None
            + }
            +
            def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
            fetchOnlyFromLeader: Boolean): Either[LogOffsetSnapshot, Errors] = {
            inReadLock(leaderIsrUpdateLock) {
            diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
            index 90200991759..d6504e64de9 100644
            — a/core/src/main/scala/kafka/server/DelayedFetch.scala
            +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
            @@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long,
            if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
            val partition = replicaManager.getPartitionOrException(topicPartition,
            expectLeader = fetchMetadata.fetchOnlyLeader)

            • val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
              + partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot =>
            • val endOffset = fetchMetadata.fetchIsolation match { - case FetchLogEnd => offsetSnapshot.logEndOffset - case FetchHighWatermark => offsetSnapshot.highWatermark - case FetchTxnCommitted => offsetSnapshot.lastStableOffset - }

              + val endOffset = fetchMetadata.fetchIsolation match

              { + case FetchLogEnd => offsetSnapshot.logEndOffset + case FetchHighWatermark => offsetSnapshot.highWatermark + case FetchTxnCommitted => offsetSnapshot.lastStableOffset + }
            • // Go directly to the check for Case D if the message offsets are the same. If the log segment
            • // has just rolled, then the high watermark offset will remain the same but be on the old segment,
            • // which would incorrectly be seen as an instance of Case C.
            • if (endOffset.messageOffset != fetchOffset.messageOffset) {
            • if (endOffset.onOlderSegment(fetchOffset)) { - // Case C, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") - return forceComplete() - }

              else if (fetchOffset.onOlderSegment(endOffset)) {

            • // Case C, this can happen when the fetch operation is falling behind the current segment
            • // or the partition has just rolled a new segment
            • debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
            • // We will not force complete the fetch request if a replica should be throttled.
            • if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
              + // Go directly to the check for Case D if the message offsets are the same. If the log segment
              + // has just rolled, then the high watermark offset will remain the same but be on the old segment,
              + // which would incorrectly be seen as an instance of Case C.
              + if (endOffset.messageOffset != fetchOffset.messageOffset)
              Unknown macro: {+ if (endOffset.onOlderSegment(fetchOffset)) { + // Case C, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { - // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) - accumulatedSize += bytesAvailable + } else if (fetchOffset.onOlderSegment(endOffset)) { + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") + // We will not force complete the fetch request if a replica should be throttled. + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + accumulatedSize += bytesAvailable + } }

              }
              }

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - rajinisivaram closed pull request #5997: KAFKA-7697 : Avoid blocking for leaderIsrUpdateLock in DelayedFetch URL: https://github.com/apache/kafka/pull/5997 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 745c89a393b..a5655c77e2d 100755 — a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition, localReplica.offsetSnapshot } + def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional [Integer] , + fetchOnlyFromLeader: Boolean): Option [LogOffsetSnapshot] = { + if (leaderIsrUpdateLock.readLock().tryLock()) { + try { + // decide whether to only fetch from leader + val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) + Some(localReplica.offsetSnapshot) + } finally { + leaderIsrUpdateLock.readLock().unlock() + } + } else + None + } + def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional [Integer] , fetchOnlyFromLeader: Boolean): Either [LogOffsetSnapshot, Errors] = { inReadLock(leaderIsrUpdateLock) { diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 90200991759..d6504e64de9 100644 — a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long, if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition, expectLeader = fetchMetadata.fetchOnlyLeader) val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) + partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot => val endOffset = fetchMetadata.fetchIsolation match { - case FetchLogEnd => offsetSnapshot.logEndOffset - case FetchHighWatermark => offsetSnapshot.highWatermark - case FetchTxnCommitted => offsetSnapshot.lastStableOffset - } + val endOffset = fetchMetadata.fetchIsolation match { + case FetchLogEnd => offsetSnapshot.logEndOffset + case FetchHighWatermark => offsetSnapshot.highWatermark + case FetchTxnCommitted => offsetSnapshot.lastStableOffset + } // Go directly to the check for Case D if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case C. if (endOffset.messageOffset != fetchOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { - // Case C, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") - return forceComplete() - } else if (fetchOffset.onOlderSegment(endOffset)) { // Case C, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + // Go directly to the check for Case D if the message offsets are the same. If the log segment + // has just rolled, then the high watermark offset will remain the same but be on the old segment, + // which would incorrectly be seen as an instance of Case C. + if (endOffset.messageOffset != fetchOffset.messageOffset) Unknown macro: {+ if (endOffset.onOlderSegment(fetchOffset)) { + // Case C, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { - // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) - accumulatedSize += bytesAvailable + } else if (fetchOffset.onOlderSegment(endOffset)) { + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") + // We will not force complete the fetch request if a replica should be throttled. + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + accumulatedSize += bytesAvailable + } } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            githubbot ASF GitHub Bot added a comment -

            rajinisivaram opened a new pull request #5999: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock
            URL: https://github.com/apache/kafka/pull/5999

            Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. When attempting to complete delayed fetch after appending new records, completion should be attempted only after releasing the leaderIsrUpdate of the Partition to which records were appended. Otherwise, waiting writers (e.g. to check if ISR needs to be shrinked) can cause deadlocks in request handler threads when trying to acquire lock of a different partition while holding on to lock of one partition.

                1. Committer Checklist (excluded from commit message)
            • [ ] Verify design and implementation
            • [ ] Verify test coverage and CI build status
            • [ ] Verify documentation (including upgrade notes)

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - rajinisivaram opened a new pull request #5999: KAFKA-7697 : Process DelayedFetch without holding leaderIsrUpdateLock URL: https://github.com/apache/kafka/pull/5999 Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. When attempting to complete delayed fetch after appending new records, completion should be attempted only after releasing the leaderIsrUpdate of the Partition to which records were appended. Otherwise, waiting writers (e.g. to check if ISR needs to be shrinked) can cause deadlocks in request handler threads when trying to acquire lock of a different partition while holding on to lock of one partition. Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            githubbot ASF GitHub Bot added a comment -

            rajinisivaram closed pull request #5999: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock
            URL: https://github.com/apache/kafka/pull/5999

            This is a PR merged from a forked repository.
            As GitHub hides the original diff on merge, it is displayed below for
            the sake of provenance:

            As this is a foreign pull request (from a fork), the diff is supplied
            below (as it won't show otherwise due to GitHub magic):

            diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
            index 745c89a393b..1f52bd769cf 100755
            — a/core/src/main/scala/kafka/cluster/Partition.scala
            +++ b/core/src/main/scala/kafka/cluster/Partition.scala
            @@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition,
            }

            val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)

            • // probably unblock some follower fetch requests since log end offset has been updated
            • replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
              // we may need to increment high watermark since ISR could be down to 1
              (info, maybeIncrementLeaderHW(leaderReplica))

            @@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition,
            // some delayed operations may be unblocked after HW changed
            if (leaderHWIncremented)
            tryCompleteDelayedRequests()
            + else

            { + // probably unblock some follower fetch requests since log end offset has been updated + replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition)) + }

            info
            }
            diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
            index 6e38ca9575b..cfaa147f407 100644
            — a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
            +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
            @@ -19,14 +19,14 @@ package kafka.cluster
            import java.io.File
            import java.nio.ByteBuffer
            import java.util.

            {Optional, Properties}

            -import java.util.concurrent.CountDownLatch
            +import java.util.concurrent.

            {CountDownLatch, Executors, TimeUnit, TimeoutException}

            import java.util.concurrent.atomic.AtomicBoolean

            import kafka.api.Request
            import kafka.common.UnexpectedAppendOffsetException
            import kafka.log.

            {Defaults => _, _}

            import kafka.server._
            -import kafka.utils.

            {MockScheduler, MockTime, TestUtils}

            +import kafka.utils.

            {CoreUtils, MockScheduler, MockTime, TestUtils}

            import kafka.zk.KafkaZkClient
            import org.apache.kafka.common.TopicPartition
            import org.apache.kafka.common.errors.ReplicaNotAvailableException
            @@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li
            import org.junit.

            {After, Before, Test}

            import org.junit.Assert._
            import org.scalatest.Assertions.assertThrows
            -import org.easymock.EasyMock
            +import org.easymock.

            {Capture, EasyMock, IAnswer}

            import scala.collection.JavaConverters._

            @@ -671,7 +671,95 @@ class PartitionTest

            { partition.updateReplicaLogReadResult(follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) - }

            + }
            +
            + /**
            + * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
            + * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
            + * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
            + * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
            + * read lock of one Partition while holding on to read lock of another Partition.
            + */
            + @Test
            + def testDelayedFetchAfterAppendRecords(): Unit = {
            + val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
            + val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
            + val controllerId = 0
            + val controllerEpoch = 0
            + val leaderEpoch = 5
            + val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
            + val isr = replicaIds
            + val logConfig = LogConfig(new Properties)
            +
            + val topicPartitions = (0 until 5).map

            { i => new TopicPartition("test-topic", i) }

            + val logs = topicPartitions.map

            { tp => logManager.getOrCreateLog(tp, logConfig) }

            + val replicas = logs.map

            { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }

            + val partitions = replicas.map

            { replica => + val tp = replica.topicPartition + val partition = new Partition(tp, + isOffline = false, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + localBrokerId = brokerId, + time, + replicaManager, + logManager, + zkClient) + partition.addReplicaIfNotExists(replica) + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + leaderEpoch, isr, 1, replicaIds, true), 0) + partition + }

            +
            + // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
            + val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
            + EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
            + .andAnswer(new IAnswer[Unit] {
            + override def answer(): Unit =

            { + val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size + val partition = partitions(anotherPartition) + partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) + }

            + }).anyTimes()
            + EasyMock.replay(replicaManager, zkClient)
            +
            + def createRecords(baseOffset: Long): MemoryRecords =

            { + val records = List( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, + baseOffset, time.milliseconds, 0) + records.foreach(builder.append) + builder.build() + }

            +
            + val done = new AtomicBoolean()
            + val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
            + try {
            + // Invoke some operation that acquires leaderIsrUpdate write lock on one thread
            + executor.submit(CoreUtils.runnable {
            + while (!done.get)

            { + partitions.foreach(_.maybeShrinkIsr(10000)) + }

            + })
            + // Append records to partitions, one partition-per-thread
            + val futures = partitions.map { partition =>
            + executor.submit(CoreUtils.runnable {
            + (1 to 10000).foreach

            { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) }

            + })
            + }
            + futures.foreach(_.get(10, TimeUnit.SECONDS))
            + done.set(true)
            + } catch

            { + case e: TimeoutException => + val allThreads = TestUtils.allThreadStackTraces() + fail(s"Test timed out with exception $e, thread stack traces: $allThreads") + }

            finally

            { + executor.shutdownNow() + executor.awaitTermination(5, TimeUnit.SECONDS) + }

            + }

            def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
            val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
            diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            index bcb05816353..e5ea6a4baae 100755
            — a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            @@ -958,6 +958,12 @@ object TestUtils extends Logging

            { assertEquals(0, threadCount) }

            + def allThreadStackTraces(): String = {
            + Thread.getAllStackTraces.asScala.map

            { case (thread, stackTrace) => + thread.getName + "\n\t" + stackTrace.toList.map(_.toString).mkString("\n\t") + }

            .mkString("\n")
            + }
            +
            /**

            • Create new LogManager instance with default configuration for testing
              */

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - rajinisivaram closed pull request #5999: KAFKA-7697 : Process DelayedFetch without holding leaderIsrUpdateLock URL: https://github.com/apache/kafka/pull/5999 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 745c89a393b..1f52bd769cf 100755 — a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition, } val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) // probably unblock some follower fetch requests since log end offset has been updated replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) @@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition, // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() + else { + // probably unblock some follower fetch requests since log end offset has been updated + replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition)) + } info } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 6e38ca9575b..cfaa147f407 100644 — a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -19,14 +19,14 @@ package kafka.cluster import java.io.File import java.nio.ByteBuffer import java.util. {Optional, Properties} -import java.util.concurrent.CountDownLatch +import java.util.concurrent. {CountDownLatch, Executors, TimeUnit, TimeoutException} import java.util.concurrent.atomic.AtomicBoolean import kafka.api.Request import kafka.common.UnexpectedAppendOffsetException import kafka.log. {Defaults => _, _} import kafka.server._ -import kafka.utils. {MockScheduler, MockTime, TestUtils} +import kafka.utils. {CoreUtils, MockScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ReplicaNotAvailableException @@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li import org.junit. {After, Before, Test} import org.junit.Assert._ import org.scalatest.Assertions.assertThrows -import org.easymock.EasyMock +import org.easymock. {Capture, EasyMock, IAnswer} import scala.collection.JavaConverters._ @@ -671,7 +671,95 @@ class PartitionTest { partition.updateReplicaLogReadResult(follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) - } + } + + /** + * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks. + * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they + * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers + * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for + * read lock of one Partition while holding on to read lock of another Partition. + */ + @Test + def testDelayedFetchAfterAppendRecords(): Unit = { + val replicaManager: ReplicaManager = EasyMock.mock(classOf [ReplicaManager] ) + val zkClient: KafkaZkClient = EasyMock.mock(classOf [KafkaZkClient] ) + val controllerId = 0 + val controllerEpoch = 0 + val leaderEpoch = 5 + val replicaIds = List [Integer] (brokerId, brokerId + 1).asJava + val isr = replicaIds + val logConfig = LogConfig(new Properties) + + val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) } + val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) } + val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) } + val partitions = replicas.map { replica => + val tp = replica.topicPartition + val partition = new Partition(tp, + isOffline = false, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + localBrokerId = brokerId, + time, + replicaManager, + logManager, + zkClient) + partition.addReplicaIfNotExists(replica) + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + leaderEpoch, isr, 1, replicaIds, true), 0) + partition + } + + // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch + val tpKey: Capture [TopicPartitionOperationKey] = EasyMock.newCapture() + EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey))) + .andAnswer(new IAnswer [Unit] { + override def answer(): Unit = { + val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size + val partition = partitions(anotherPartition) + partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) + } + }).anyTimes() + EasyMock.replay(replicaManager, zkClient) + + def createRecords(baseOffset: Long): MemoryRecords = { + val records = List( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)) + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, + baseOffset, time.milliseconds, 0) + records.foreach(builder.append) + builder.build() + } + + val done = new AtomicBoolean() + val executor = Executors.newFixedThreadPool(topicPartitions.size + 1) + try { + // Invoke some operation that acquires leaderIsrUpdate write lock on one thread + executor.submit(CoreUtils.runnable { + while (!done.get) { + partitions.foreach(_.maybeShrinkIsr(10000)) + } + }) + // Append records to partitions, one partition-per-thread + val futures = partitions.map { partition => + executor.submit(CoreUtils.runnable { + (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) } + }) + } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + done.set(true) + } catch { + case e: TimeoutException => + val allThreads = TestUtils.allThreadStackTraces() + fail(s"Test timed out with exception $e, thread stack traces: $allThreads") + } finally { + executor.shutdownNow() + executor.awaitTermination(5, TimeUnit.SECONDS) + } + } def createRecords(records: Iterable [SimpleRecord] , baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bcb05816353..e5ea6a4baae 100755 — a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -958,6 +958,12 @@ object TestUtils extends Logging { assertEquals(0, threadCount) } + def allThreadStackTraces(): String = { + Thread.getAllStackTraces.asScala.map { case (thread, stackTrace) => + thread.getName + "\n\t" + stackTrace.toList.map(_.toString).mkString("\n\t") + } .mkString("\n") + } + /** Create new LogManager instance with default configuration for testing */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            candicewan Candice Wan added a comment -

            when 2.1.1 is expected to be released?

            candicewan Candice Wan added a comment - when 2.1.1 is expected to be released?
            jnadler Jeff Nadler added a comment -

            Hi all!   We just had this bug hit a second cluster - deadlocked node with fast growing FD consumption.    Any chance we can get a release quickly that contains this fix?   

            jnadler Jeff Nadler added a comment - Hi all!   We just had this bug hit a second cluster - deadlocked node with fast growing FD consumption.    Any chance we can get a release quickly that contains this fix?   

            Same here. After upgrading from v2.0.0 to v2.1.0 we also hit this bug.

            mah82 Maurits Hartman added a comment - Same here. After upgrading from v2.0.0 to v2.1.0 we also hit this bug.
            rsivaram Rajini Sivaram added a comment -

            Apache Kafka 2.1.1 containing the fix is currently going through the release process and RC1 is available for testing and voting - see http://mail-archives.apache.org/mod_mbox/kafka-users/201901.mbox/%3C67fc2ed5-0cc3-4fc6-8e14-ba562f6e4c56@www.fastmail.com%3E

            rsivaram Rajini Sivaram added a comment - Apache Kafka 2.1.1 containing the fix is currently going through the release process and RC1 is available for testing and voting - see http://mail-archives.apache.org/mod_mbox/kafka-users/201901.mbox/%3C67fc2ed5-0cc3-4fc6-8e14-ba562f6e4c56@www.fastmail.com%3E
            amoratti Andres Moratti added a comment -

            Also hit this bug after upgrading from 2.0.0 to 2.1.0.

            amoratti Andres Moratti added a comment - Also hit this bug after upgrading from 2.0.0 to 2.1.0.
            shaharmor Shahar added a comment -

            Is there anything that can be done as a workaround for this issue in the mean time? Any configuration that can be changed?

            WIll a restart of all Kafka brokers help?

            shaharmor Shahar added a comment - Is there anything that can be done as a workaround for this issue in the mean time? Any configuration that can be changed? WIll a restart of all Kafka brokers help?
            rsivaram Rajini Sivaram added a comment -

            shaharmor Apache Kafka 2.1.1 containing the fix has been released. So an upgrade is recommended. With 2.1.0, you will need to restart affected brokers whenever they run into the issue.

            rsivaram Rajini Sivaram added a comment - shaharmor Apache Kafka 2.1.1 containing the fix has been released. So an upgrade is recommended. With 2.1.0, you will need to restart affected brokers whenever they run into the issue.
            jnadler Jeff Nadler added a comment -

            2.1.1 has other issues.   I'd recommend proceeding with caution.   Had to downgrade all of our clusters back to 2.0.1 to get stable.

            jnadler Jeff Nadler added a comment - 2.1.1 has other issues.   I'd recommend proceeding with caution.   Had to downgrade all of our clusters back to 2.0.1 to get stable.
            ijuma Ismael Juma added a comment -

            What issues jnadler?

            ijuma Ismael Juma added a comment - What issues jnadler ?
            ankit.jhil Ankit Singhal added a comment -

            We also hit the same issue. Had to restart the broker after almost every 6 hours!

            jnadler what is the issue with 2.1.1 ? We are planning to move to this version.. 

            rsivaram Shall we move to 2.0.1 since 2.1.1 is just released and we might hit other issues? 2.0.1 seems pretty stable!

            ankit.jhil Ankit Singhal added a comment - We also hit the same issue. Had to restart the broker after almost every 6 hours! jnadler what is the issue with 2.1.1 ? We are planning to move to this version..  rsivaram Shall we move to 2.0.1 since 2.1.1 is just released and we might hit other issues? 2.0.1 seems pretty stable!
            little brother ma little brother ma added a comment - - edited

            we also hit the same issue with 2.1.1 !

            Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value is always 1000, and we config queued.max.requests=1000

             

            kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 os_prio=0 tid=0x00007fb7ce0ba800 nid=0x2d5 waiting on condition [0x00007fad6e5f8000]
            java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)

            • parking to wait for <0x00000004530783a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
              at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
              at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
              at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
              at kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
              at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
              at scala.collection.Iterator.foreach(Iterator.scala:937)
              at scala.collection.Iterator.foreach$(Iterator.scala:937)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
              at scala.collection.IterableLike.foreach(IterableLike.scala:70)
              at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
              at kafka.network.Processor.run(SocketServer.scala:595)
              at java.lang.Thread.run(Thread.java:748)

            Locked ownable synchronizers:

            • None

             

            "kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x00007fb7ceee6800 nid=0x2cb waiting on condition [0x00007fad71af4000]
            java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)

            • parking to wait for <0x00000004540423f0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
              at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
              at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
              at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
              at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
              at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
              at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
              at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
              at scala.collection.TraversableLike.map(TraversableLike.scala:233)
              at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
              at scala.collection.AbstractTraversable.map(Traversable.scala:104)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
              at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
              at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
              at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
              at kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
              at kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown Source)
              at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
              at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
              at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
              at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
              at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
              at java.lang.Thread.run(Thread.java:748)

            Locked ownable synchronizers:

            • <0x0000000794ea4248> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)    

             

            The thread dumps of a broker: kafka_jstack.txt

             

            little brother ma little brother ma added a comment - - edited we also hit the same issue with 2.1.1 ! Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value is always 1000, and we config queued.max.requests=1000   kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 os_prio=0 tid=0x00007fb7ce0ba800 nid=0x2d5 waiting on condition [0x00007fad6e5f8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000004530783a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310) at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709) at kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699) at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699) at kafka.network.Processor.run(SocketServer.scala:595) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers: None   "kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x00007fb7ceee6800 nid=0x2cb waiting on condition [0x00007fad71af4000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000004540423f0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283) at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735) at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source) at scala.collection.immutable.Map$Map1.foreach(Map.scala:125) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470) at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280) at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423) at kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518) at kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown Source) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197) at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503) at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482) at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365) at kafka.server.KafkaApis.handle(KafkaApis.scala:114) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers: <0x0000000794ea4248> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)       The thread dumps of a broker:  kafka_jstack.txt  
            boge Liu added a comment -

            little brother ma  wo also hit the same issue with 2.1.1 .I checked the source code for version 2.1.1 and found that partition. Scala did not incorporate the changes

            boge Liu added a comment - little brother ma   wo also hit the same issue with 2.1.1 .I checked the source code for version 2.1.1 and found that partition. Scala did not incorporate the changes
            yanrui yanrui added a comment -

            rsivaram We also encountered the same problem,and controller-event-thread looks like deadlock.
            Newly created topic which isr and leader is none,the topicChangeListener can not work

            yanrui yanrui added a comment - rsivaram We also encountered the same problem,and controller-event-thread looks like deadlock. Newly created topic which isr and leader is none,the topicChangeListener can not work
            rsivaram Rajini Sivaram added a comment -

            little brother ma boge yanrui Can you provide full thread dumps of a broker that encountered this issue with 2.1.1? Thank you!

            rsivaram Rajini Sivaram added a comment - little brother ma boge yanrui Can you provide full thread dumps of a broker that encountered this issue with 2.1.1? Thank you!

            Currently experiencing this on Kafka 2.1.0, happen to have a thread dump handy if it's useful! We're noticing it occur seemingly randomly, however partition reassignment and heavy produce bursts seem to exacerbate this issue. kill -9 has been the only solution for us thus far, tons of open file descriptors start to accumulate.

            kafka.log

            jwesteen Jackson Westeen added a comment - Currently experiencing this on Kafka 2.1.0, happen to have a thread dump handy if it's useful! We're noticing it occur seemingly randomly, however partition reassignment and heavy produce bursts seem to exacerbate this issue. kill -9 has been the only solution for us thus far, tons of open file descriptors start to accumulate. kafka.log
            nsnmurthy N S N MURTHY added a comment -

            rsivaram  We also encountered the same issue with 2.1.0 in our production stack.

            Below is the sample stack trace. If we want to up grade/down grade Kafka in our setup, which version we can go.

            kafka-request-handler-3 tid=53 [WAITING] [DAEMON]
            java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock() ReentrantReadWriteLock.java:727
            kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:249
            kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
            kafka.cluster.Partition.fetchOffsetSnapshot(Optional, boolean) Partition.scala:832
            kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:87
            kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:79
            kafka.server.DelayedFetch$$Lambda$969.apply(Object)
            scala.collection.mutable.ResizableArray.foreach(Function1) ResizableArray.scala:58
            scala.collection.mutable.ResizableArray.foreach$(ResizableArray, Function1) ResizableArray.scala:51
            scala.collection.mutable.ArrayBuffer.foreach(Function1) ArrayBuffer.scala:47
            kafka.server.DelayedFetch.tryComplete() DelayedFetch.scala:79
            kafka.server.DelayedOperation.maybeTryComplete() DelayedOperation.scala:121
            kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched() DelayedOperation.scala:371
            kafka.server.DelayedOperationPurgatory.checkAndComplete(Object) DelayedOperation.scala:277
            kafka.server.ReplicaManager.tryCompleteDelayedFetch(DelayedOperationKey) ReplicaManager.scala:307
            kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition, MemoryRecords, boolean, int) Partition.scala:743
            kafka.cluster.Partition$$Lambda$856.apply()
            kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
            kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
            kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:729
            kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager, boolean, boolean, short, Tuple2) ReplicaManager.scala:735
            kafka.server.ReplicaManager$$Lambda$844.apply(Object)
            scala.collection.TraversableLike.$anonfun$map$1(Function1, Builder, Object) TraversableLike.scala:233
            scala.collection.TraversableLike$$Lambda$10.apply(Object)
            scala.collection.mutable.HashMap.$anonfun$foreach$1(Function1, DefaultEntry) HashMap.scala:145
            scala.collection.mutable.HashMap$$Lambda$22.apply(Object)
            scala.collection.mutable.HashTable.foreachEntry(Function1) HashTable.scala:235
            scala.collection.mutable.HashTable.foreachEntry$(HashTable, Function1) HashTable.scala:228
            scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
            scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:145
            scala.collection.TraversableLike.map(Function1, CanBuildFrom) TraversableLike.scala:233
            scala.collection.TraversableLike.map$(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:226
            scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104
            kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:723
            kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:470
            kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:482
            kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:106
            kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
            java.lang.Thread.run() Thread.java:748

            -Murthy

            nsnmurthy N S N MURTHY added a comment - rsivaram   We also encountered the same issue with 2.1.0 in our production stack. Below is the sample stack trace. If we want to up grade/down grade Kafka in our setup, which version we can go. kafka-request-handler-3 tid=53 [WAITING] [DAEMON] java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock() ReentrantReadWriteLock.java:727 kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:249 kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257 kafka.cluster.Partition.fetchOffsetSnapshot(Optional, boolean) Partition.scala:832 kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:87 kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:79 kafka.server.DelayedFetch$$Lambda$969.apply(Object) scala.collection.mutable.ResizableArray.foreach(Function1) ResizableArray.scala:58 scala.collection.mutable.ResizableArray.foreach$(ResizableArray, Function1) ResizableArray.scala:51 scala.collection.mutable.ArrayBuffer.foreach(Function1) ArrayBuffer.scala:47 kafka.server.DelayedFetch.tryComplete() DelayedFetch.scala:79 kafka.server.DelayedOperation.maybeTryComplete() DelayedOperation.scala:121 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched() DelayedOperation.scala:371 kafka.server.DelayedOperationPurgatory.checkAndComplete(Object) DelayedOperation.scala:277 kafka.server.ReplicaManager.tryCompleteDelayedFetch(DelayedOperationKey) ReplicaManager.scala:307 kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition, MemoryRecords, boolean, int) Partition.scala:743 kafka.cluster.Partition$$Lambda$856.apply() kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251 kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257 kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:729 kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager, boolean, boolean, short, Tuple2) ReplicaManager.scala:735 kafka.server.ReplicaManager$$Lambda$844.apply(Object) scala.collection.TraversableLike.$anonfun$map$1(Function1, Builder, Object) TraversableLike.scala:233 scala.collection.TraversableLike$$Lambda$10.apply(Object) scala.collection.mutable.HashMap.$anonfun$foreach$1(Function1, DefaultEntry) HashMap.scala:145 scala.collection.mutable.HashMap$$Lambda$22.apply(Object) scala.collection.mutable.HashTable.foreachEntry(Function1) HashTable.scala:235 scala.collection.mutable.HashTable.foreachEntry$(HashTable, Function1) HashTable.scala:228 scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40 scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:145 scala.collection.TraversableLike.map(Function1, CanBuildFrom) TraversableLike.scala:233 scala.collection.TraversableLike.map$(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:226 scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104 kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:723 kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:470 kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:482 kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:106 kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69 java.lang.Thread.run() Thread.java:748 -Murthy
            rsivaram Rajini Sivaram added a comment -

            jwesteen nsnmurthy Thank you! The deadlock was fixed in 2.2.0 and 2.1.1. We are keen to see if there are other similar issues still remaining in these two releases, so that we can fix them before the next release. If experiencing this issue in 2.1.0, please upgrade to 2.1.1.

            rsivaram Rajini Sivaram added a comment - jwesteen nsnmurthy Thank you! The deadlock was fixed in 2.2.0 and 2.1.1. We are keen to see if there are other similar issues still remaining in these two releases, so that we can fix them before the next release. If experiencing this issue in 2.1.0, please upgrade to 2.1.1.
            yanrui yanrui added a comment -

            rsivaram
            I have put file called kafka_jstack.txt which contain full thread dumps of a broker that encountered this issue with 2.1.1 in the attachment

            yanrui yanrui added a comment - rsivaram I have put file called kafka_jstack.txt which contain full thread dumps of a broker that encountered this issue with 2.1.1 in the attachment
            yanrui yanrui added a comment -

            rsivaramThis problem usually occurs with kafka and zk broken chains,it seems to be this situation triggers a lot of write lock operations and the write lock is not released at the end.

            yanrui yanrui added a comment - rsivaram This problem usually occurs with kafka and zk broken chains,it seems to be this situation triggers a lot of write lock operations and the write lock is not released at the end.
            yanrui yanrui added a comment -

            use 2.1.1 kafka which deadlock happen a moment ago,I have put file called 322.tdump in the attachment

            yanrui yanrui added a comment - use 2.1.1 kafka which deadlock happen a moment ago,I have put file called 322.tdump in the attachment
            rsivaram Rajini Sivaram added a comment -

            yanrui Thanks for the thread dumps. It looks like kafka_jstack.txt was using an older release (not 2.1.1, perhaps 2.1.0 or earlier) since it doesn't seem to have the fix for this Jira. Can you confirm that?

            322.dump shows the issue described in KAFKA-8151 for 2.1.1 due to ZK session expiry.  As a workaround, you can increase ZK session timeouts for the broker until KAFKA-8151 is fixed if you are regularly running into this issue. It will be good to know if you see the issue with 2.2.0 if you are able to recreate. Thanks.

            rsivaram Rajini Sivaram added a comment - yanrui Thanks for the thread dumps. It looks like kafka_jstack.txt was using an older release (not 2.1.1, perhaps 2.1.0 or earlier) since it doesn't seem to have the fix for this Jira. Can you confirm that? 322.dump shows the issue described in KAFKA-8151 for 2.1.1 due to ZK session expiry.  As a workaround, you can increase ZK session timeouts for the broker until KAFKA-8151 is fixed if you are regularly running into this issue. It will be good to know if you see the issue with 2.2.0 if you are able to recreate. Thanks.
            yanrui yanrui added a comment -

            rsivaramThank you very much, let me increase this parameters and try it for a few days

            yanrui yanrui added a comment - rsivaram Thank you very much, let me increase this parameters and try it for a few days
            muchl Muchl added a comment -

            rsivaram The problem was fixed after the upgrade 2.1.1, but there was a new problem.I'm not sure if the two questions are related, but the logs they print when the problem occurs are similar.
            A similar broker hangs was encountered in 2.1.1 . the problem cause broker crash in 2.1.0, but will automatically recovered in a few minutes in 2.1.1, and the cluster was unavailable during this time.
            I uploaded a log whose file name is 2.1.1-hangs.log 2.1.1-hangs.log . When we find and log in to the server, the cluster was restored. All the stack information has not yet been obtained, but we can see that there is a problem from the logs of the broker and consumer. Could you give me some help,Thank you !

            muchl Muchl added a comment - rsivaram The problem was fixed after the upgrade 2.1.1, but there was a new problem.I'm not sure if the two questions are related, but the logs they print when the problem occurs are similar. A similar broker hangs was encountered in 2.1.1 . the problem cause broker crash in 2.1.0, but will automatically recovered in a few minutes in 2.1.1, and the cluster was unavailable during this time. I uploaded a log whose file name is 2.1.1-hangs.log 2.1.1-hangs.log . When we find and log in to the server, the cluster was restored. All the stack information has not yet been obtained, but we can see that there is a problem from the logs of the broker and consumer. Could you give me some help,Thank you !
            rsivaram Rajini Sivaram added a comment -

            muchl This Jira addressed the deadlock which is fixed in 2.1.1. There is a separate Jira  to reduce lock contention (https://issues.apache.org/jira/browse/KAFKA-7538) which could be the issue you ran into in 2.1.1.

            rsivaram Rajini Sivaram added a comment - muchl This Jira addressed the deadlock which is fixed in 2.1.1. There is a separate Jira  to reduce lock contention ( https://issues.apache.org/jira/browse/KAFKA-7538 ) which could be the issue you ran into in 2.1.1.
            muchl Muchl added a comment -

            rsivaram Thank you very much.
            KAFKA-7538 This seems to be the problem I encountered.I will focus on the KAFKA-7538.

            muchl Muchl added a comment - rsivaram Thank you very much. KAFKA-7538 This seems to be the problem I encountered.I will focus on the KAFKA-7538 .
            shaharmor Shahar added a comment -

            Is it possible to overcome this issue by simply increasing the number of threads?

            Maybe using num.io.threads or num.network.threads?

            shaharmor Shahar added a comment - Is it possible to overcome this issue by simply increasing the number of threads? Maybe using num.io.threads or num.network.threads?
            Vinayzxzx VinayKumar added a comment -

            Does increasing num.network.threads help improve the performance when kafka has high load in terms of more no. of producer/consumer requests?

            Vinayzxzx VinayKumar added a comment - Does increasing num.network.threads help improve the performance when kafka has high load in terms of more no. of producer/consumer requests?

            People

              rsivaram Rajini Sivaram
              gian Gian Merlino
              Jason Gustafson Jason Gustafson
              Votes:
              0 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: