Description
After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up within a few minutes (by "locked up" I mean that all request handler threads were busy, and other brokers reported that they couldn't communicate with it). I restarted it a few times and it did the same thing each time. After downgrading to 0.10.2.0, the broker was stable. I attached a threaddump.txt from the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
It jumps out that there are two threads that already have some read lock (can't tell which one) and are trying to acquire a second one (on two different read locks: 0x0000000708184b88 and 0x000000070821f188): kafka-request-handler-1 and kafka-request-handler-4. Both are handling a produce request, and in the process of doing so, are calling Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the same time, both of those locks have writers from other threads waiting on them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks appear to have writers that hold them (if only because no threads in the dump are deep enough in inWriteLock to indicate that).
ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over readers. Is it possible that kafka-request-handler-1 and kafka-request-handler-4 are each trying to read-lock the partition that is currently locked by the other one, and they're both parked waiting for kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they never will, because the former two threads own read locks and aren't giving them up?
Attachments
Attachments
- 2.1.1-hangs.log
- 91 kB
- Muchl
- 322.tdump
- 89 kB
- yanrui
- kafka_jstack.txt
- 256 kB
- yanrui
- kafka.log
- 108 kB
- Jackson Westeen
- threaddump.txt
- 78 kB
- Gian Merlino
Issue Links
- duplicates
-
KAFKA-7870 Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.
- Open
-
KAFKA-8537 Kafka issues after 2.1.0 upgrade: java.net.SocketTimeoutException: Failed to connect within 30000 ms
- Open
-
KAFKA-7802 Connection to Broker Disconnected Taking Down the Whole Cluster
- Open
-
KAFKA-7757 Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
- Open
-
KAFKA-7913 Kafka broker halts and messes up the whole cluster
- Open
-
KAFKA-7876 Broker suddenly got disconnected
- Resolved
- links to
Activity
Changes made under KAFKA-7395 now protect fetch using the Partition's leaderIsrUpdateLock. This results in the read lock being acquired while completing a delayed fetch. This is unsafe since delayed operations can be completed while holding onto another Partition lock. For example the thread dump for request-handler-4 shows:
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000070821f188> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:832)
at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:87)
at kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79)
at kafka.server.DelayedFetch$$Lambda$912/582152661.apply(Unknown Source)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79)
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:307)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:743)
at kafka.cluster.Partition$$Lambda$917/80048373.apply(Unknown Source)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
at kafka.server.ReplicaManager$$Lambda$915/220982367.apply(Unknown Source)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.TraversableLike$$Lambda$12/1209669119.apply(Unknown Source)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:145)
at scala.collection.mutable.HashMap$$Lambda$24/477289012.apply(Unknown Source)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:235)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:228)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:145)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
A whole bunch of threads including all request handler threads seem to be deadlocked as a result of leaderIsrUpdateLock of two partitions that are blocked while completing delayed fetch as a result of waiting writers.
For purgatory operations that acquire a lock, we use that lock as the delayed operation lock, but that is not an option here since fetch could contain multiple partitions. So we need some other way to avoid blocking for a Partition lock while holding onto another Partition lock.
rajinisivaram opened a new pull request #5997: KAFKA-7697: Avoid blocking for leaderIsrUpdateLock in DelayedFetch
URL: https://github.com/apache/kafka/pull/5997
-
-
- Committer Checklist (excluded from commit message)
-
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
rajinisivaram closed pull request #5997: KAFKA-7697: Avoid blocking for leaderIsrUpdateLock in DelayedFetch
URL: https://github.com/apache/kafka/pull/5997
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a393b..a5655c77e2d 100755
— a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition,
localReplica.offsetSnapshot
}
+ def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
+ fetchOnlyFromLeader: Boolean): Option[LogOffsetSnapshot] = {
+ if (leaderIsrUpdateLock.readLock().tryLock()) {
+ try
finally
{ + leaderIsrUpdateLock.readLock().unlock() + }+ } else
+ None
+ }
+
def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Either[LogOffsetSnapshot, Errors] = {
inReadLock(leaderIsrUpdateLock) {
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 90200991759..d6504e64de9 100644
— a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long,
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
val partition = replicaManager.getPartitionOrException(topicPartition,
expectLeader = fetchMetadata.fetchOnlyLeader)
- val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
+ partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot =>
- val endOffset = fetchMetadata.fetchIsolation match
{
- case FetchLogEnd => offsetSnapshot.logEndOffset
- case FetchHighWatermark => offsetSnapshot.highWatermark
- case FetchTxnCommitted => offsetSnapshot.lastStableOffset
- }
+ val endOffset = fetchMetadata.fetchIsolation match
{ + case FetchLogEnd => offsetSnapshot.logEndOffset + case FetchHighWatermark => offsetSnapshot.highWatermark + case FetchTxnCommitted => offsetSnapshot.lastStableOffset + }
- // Go directly to the check for Case D if the message offsets are the same. If the log segment
- // has just rolled, then the high watermark offset will remain the same but be on the old segment,
- // which would incorrectly be seen as an instance of Case C.
- if (endOffset.messageOffset != fetchOffset.messageOffset) {
- if (endOffset.onOlderSegment(fetchOffset))
{
- // Case C, this can happen when the new fetch operation is on a truncated leader
- debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
- return forceComplete()
- }
else if (fetchOffset.onOlderSegment(endOffset)) {
- // Case C, this can happen when the fetch operation is falling behind the current segment
- // or the partition has just rolled a new segment
- debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
- // We will not force complete the fetch request if a replica should be throttled.
- if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
+ // Go directly to the check for Case D if the message offsets are the same. If the log segment
+ // has just rolled, then the high watermark offset will remain the same but be on the old segment,
+ // which would incorrectly be seen as an instance of Case C.
+ if (endOffset.messageOffset != fetchOffset.messageOffset)Unknown macro: {+ if (endOffset.onOlderSegment(fetchOffset)) { + // Case C, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { - // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) - accumulatedSize += bytesAvailable + } else if (fetchOffset.onOlderSegment(endOffset)) { + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") + // We will not force complete the fetch request if a replica should be throttled. + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) + if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) + accumulatedSize += bytesAvailable + } }}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
rajinisivaram opened a new pull request #5999: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock
URL: https://github.com/apache/kafka/pull/5999
Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. When attempting to complete delayed fetch after appending new records, completion should be attempted only after releasing the leaderIsrUpdate of the Partition to which records were appended. Otherwise, waiting writers (e.g. to check if ISR needs to be shrinked) can cause deadlocks in request handler threads when trying to acquire lock of a different partition while holding on to lock of one partition.
-
-
- Committer Checklist (excluded from commit message)
-
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
rajinisivaram closed pull request #5999: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock
URL: https://github.com/apache/kafka/pull/5999
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a393b..1f52bd769cf 100755
— a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition,
}
val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
- // probably unblock some follower fetch requests since log end offset has been updated
- replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderReplica))
@@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition,
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
+ else
info
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6e38ca9575b..cfaa147f407 100644
— a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,14 +19,14 @@ package kafka.cluster
import java.io.File
import java.nio.ByteBuffer
import java.util.
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.Request
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.
import kafka.server._
-import kafka.utils.
+import kafka.utils.
{CoreUtils, MockScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ReplicaNotAvailableException
@@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li
import org.junit.
import org.junit.Assert._
import org.scalatest.Assertions.assertThrows
-import org.easymock.EasyMock
+import org.easymock.
import scala.collection.JavaConverters._
@@ -671,7 +671,95 @@ class PartitionTest
{ partition.updateReplicaLogReadResult(follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) - }+ }
+
+ /**
+ * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
+ * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
+ * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
+ * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
+ * read lock of one Partition while holding on to read lock of another Partition.
+ */
+ @Test
+ def testDelayedFetchAfterAppendRecords(): Unit = {
+ val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+ val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
+ val controllerId = 0
+ val controllerEpoch = 0
+ val leaderEpoch = 5
+ val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
+ val isr = replicaIds
+ val logConfig = LogConfig(new Properties)
+
+ val topicPartitions = (0 until 5).map
+ val logs = topicPartitions.map
{ tp => logManager.getOrCreateLog(tp, logConfig) }+ val replicas = logs.map
{ log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }+ val partitions = replicas.map
{ replica => + val tp = replica.topicPartition + val partition = new Partition(tp, + isOffline = false, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + localBrokerId = brokerId, + time, + replicaManager, + logManager, + zkClient) + partition.addReplicaIfNotExists(replica) + partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + leaderEpoch, isr, 1, replicaIds, true), 0) + partition + }+
+ // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
+ val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
+ EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
+ .andAnswer(new IAnswer[Unit] {
+ override def answer(): Unit =
+ }).anyTimes()
+ EasyMock.replay(replicaManager, zkClient)
+
+ def createRecords(baseOffset: Long): MemoryRecords =
+
+ val done = new AtomicBoolean()
+ val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
+ try {
+ // Invoke some operation that acquires leaderIsrUpdate write lock on one thread
+ executor.submit(CoreUtils.runnable {
+ while (!done.get)
+ })
+ // Append records to partitions, one partition-per-thread
+ val futures = partitions.map { partition =>
+ executor.submit(CoreUtils.runnable {
+ (1 to 10000).foreach
+ })
+ }
+ futures.foreach(_.get(10, TimeUnit.SECONDS))
+ done.set(true)
+ } catch
finally
{ + executor.shutdownNow() + executor.awaitTermination(5, TimeUnit.SECONDS) + }+ }
def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bcb05816353..e5ea6a4baae 100755
— a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -958,6 +958,12 @@ object TestUtils extends Logging
+ def allThreadStackTraces(): String = {
+ Thread.getAllStackTraces.asScala.map
.mkString("\n")
+ }
+
/**
- Create new LogManager instance with default configuration for testing
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Hi all! We just had this bug hit a second cluster - deadlocked node with fast growing FD consumption. Any chance we can get a release quickly that contains this fix?
Same here. After upgrading from v2.0.0 to v2.1.0 we also hit this bug.
Apache Kafka 2.1.1 containing the fix is currently going through the release process and RC1 is available for testing and voting - see http://mail-archives.apache.org/mod_mbox/kafka-users/201901.mbox/%3C67fc2ed5-0cc3-4fc6-8e14-ba562f6e4c56@www.fastmail.com%3E
Is there anything that can be done as a workaround for this issue in the mean time? Any configuration that can be changed?
WIll a restart of all Kafka brokers help?
shaharmor Apache Kafka 2.1.1 containing the fix has been released. So an upgrade is recommended. With 2.1.0, you will need to restart affected brokers whenever they run into the issue.
2.1.1 has other issues. I'd recommend proceeding with caution. Had to downgrade all of our clusters back to 2.0.1 to get stable.
We also hit the same issue. Had to restart the broker after almost every 6 hours!
jnadler what is the issue with 2.1.1 ? We are planning to move to this version..
rsivaram Shall we move to 2.0.1 since 2.1.1 is just released and we might hit other issues? 2.0.1 seems pretty stable!
we also hit the same issue with 2.1.1 !
Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value is always 1000, and we config queued.max.requests=1000
kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 os_prio=0 tid=0x00007fb7ce0ba800 nid=0x2d5 waiting on condition [0x00007fad6e5f8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004530783a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
at kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
at kafka.network.Processor.run(SocketServer.scala:595)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x00007fb7ceee6800 nid=0x2cb waiting on condition [0x00007fad71af4000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004540423f0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
at kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
at kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x0000000794ea4248> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
The thread dumps of a broker: kafka_jstack.txt
little brother ma wo also hit the same issue with 2.1.1 .I checked the source code for version 2.1.1 and found that partition. Scala did not incorporate the changes
rsivaram We also encountered the same problem,and controller-event-thread looks like deadlock.
Newly created topic which isr and leader is none,the topicChangeListener can not work
little brother ma boge yanrui Can you provide full thread dumps of a broker that encountered this issue with 2.1.1? Thank you!
Currently experiencing this on Kafka 2.1.0, happen to have a thread dump handy if it's useful! We're noticing it occur seemingly randomly, however partition reassignment and heavy produce bursts seem to exacerbate this issue. kill -9 has been the only solution for us thus far, tons of open file descriptors start to accumulate.
rsivaram We also encountered the same issue with 2.1.0 in our production stack.
Below is the sample stack trace. If we want to up grade/down grade Kafka in our setup, which version we can go.
kafka-request-handler-3 tid=53 [WAITING] [DAEMON]
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock() ReentrantReadWriteLock.java:727
kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:249
kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
kafka.cluster.Partition.fetchOffsetSnapshot(Optional, boolean) Partition.scala:832
kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:87
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch, IntRef, Object, Tuple2) DelayedFetch.scala:79
kafka.server.DelayedFetch$$Lambda$969.apply(Object)
scala.collection.mutable.ResizableArray.foreach(Function1) ResizableArray.scala:58
scala.collection.mutable.ResizableArray.foreach$(ResizableArray, Function1) ResizableArray.scala:51
scala.collection.mutable.ArrayBuffer.foreach(Function1) ArrayBuffer.scala:47
kafka.server.DelayedFetch.tryComplete() DelayedFetch.scala:79
kafka.server.DelayedOperation.maybeTryComplete() DelayedOperation.scala:121
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched() DelayedOperation.scala:371
kafka.server.DelayedOperationPurgatory.checkAndComplete(Object) DelayedOperation.scala:277
kafka.server.ReplicaManager.tryCompleteDelayedFetch(DelayedOperationKey) ReplicaManager.scala:307
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition, MemoryRecords, boolean, int) Partition.scala:743
kafka.cluster.Partition$$Lambda$856.apply()
kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257
kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:729
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager, boolean, boolean, short, Tuple2) ReplicaManager.scala:735
kafka.server.ReplicaManager$$Lambda$844.apply(Object)
scala.collection.TraversableLike.$anonfun$map$1(Function1, Builder, Object) TraversableLike.scala:233
scala.collection.TraversableLike$$Lambda$10.apply(Object)
scala.collection.mutable.HashMap.$anonfun$foreach$1(Function1, DefaultEntry) HashMap.scala:145
scala.collection.mutable.HashMap$$Lambda$22.apply(Object)
scala.collection.mutable.HashTable.foreachEntry(Function1) HashTable.scala:235
scala.collection.mutable.HashTable.foreachEntry$(HashTable, Function1) HashTable.scala:228
scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:145
scala.collection.TraversableLike.map(Function1, CanBuildFrom) TraversableLike.scala:233
scala.collection.TraversableLike.map$(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:226
scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104
kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:723
kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:470
kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:482
kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:106
kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
java.lang.Thread.run() Thread.java:748
-Murthy
jwesteen nsnmurthy Thank you! The deadlock was fixed in 2.2.0 and 2.1.1. We are keen to see if there are other similar issues still remaining in these two releases, so that we can fix them before the next release. If experiencing this issue in 2.1.0, please upgrade to 2.1.1.
rsivaram
I have put file called kafka_jstack.txt which contain full thread dumps of a broker that encountered this issue with 2.1.1 in the attachment
rsivaramThis problem usually occurs with kafka and zk broken chains,it seems to be this situation triggers a lot of write lock operations and the write lock is not released at the end.
use 2.1.1 kafka which deadlock happen a moment ago,I have put file called 322.tdump in the attachment
yanrui Thanks for the thread dumps. It looks like kafka_jstack.txt was using an older release (not 2.1.1, perhaps 2.1.0 or earlier) since it doesn't seem to have the fix for this Jira. Can you confirm that?
322.dump shows the issue described in KAFKA-8151 for 2.1.1 due to ZK session expiry. As a workaround, you can increase ZK session timeouts for the broker until KAFKA-8151 is fixed if you are regularly running into this issue. It will be good to know if you see the issue with 2.2.0 if you are able to recreate. Thanks.
rsivaramThank you very much, let me increase this parameters and try it for a few days
rsivaram The problem was fixed after the upgrade 2.1.1, but there was a new problem.I'm not sure if the two questions are related, but the logs they print when the problem occurs are similar.
A similar broker hangs was encountered in 2.1.1 . the problem cause broker crash in 2.1.0, but will automatically recovered in a few minutes in 2.1.1, and the cluster was unavailable during this time.
I uploaded a log whose file name is 2.1.1-hangs.log 2.1.1-hangs.log . When we find and log in to the server, the cluster was restored. All the stack information has not yet been obtained, but we can see that there is a problem from the logs of the broker and consumer. Could you give me some help,Thank you !
muchl This Jira addressed the deadlock which is fixed in 2.1.1. There is a separate Jira to reduce lock contention (https://issues.apache.org/jira/browse/KAFKA-7538) which could be the issue you ran into in 2.1.1.
rsivaram Thank you very much.
KAFKA-7538 This seems to be the problem I encountered.I will focus on the KAFKA-7538.
Is it possible to overcome this issue by simply increasing the number of threads?
Maybe using num.io.threads or num.network.threads?
Does increasing num.network.threads help improve the performance when kafka has high load in terms of more no. of producer/consumer requests?
Marking as blocker until we understand the details.