Description
kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.
I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.
*— 25818577497 ns (20.84%), 5805 samples [ 0] kafka.server.FetchSessionCache [ 1] kafka.server.FetchManager.newContext [ 2] kafka.server.KafkaApis.handleFetchRequest [ 3] kafka.server.KafkaApis.handle [ 4] kafka.server.KafkaRequestHandler.run [ 5] java.lang.Thread.run
FetchSession.scala:
cache.synchronized { cache.get(reqMetadata.sessionId) match { case None => { debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.") new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata) } case Some(session) => session.synchronized { if (session.epoch != reqMetadata.epoch) { debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " + s"${session.epoch}, but got ${reqMetadata.epoch} instead."); new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata) } else { val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata) if (session.isEmpty) { debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " + s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " + s"there are no more partitions left.") cache.remove(session) new SessionlessFetchContext(fetchData) } else { cache.touch(session, time.milliseconds()) session.epoch = JFetchMetadata.nextEpoch(session.epoch) debug(s"Created a new incremental FetchContext for session id ${session.id}, " + s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " + s"updated ${partitionsToLogString(updated)}, " + s"removed ${partitionsToLogString(removed)}") new IncrementalFetchContext(time, reqMetadata, session) } } } } }
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" (https://github.com/apache/kafka/pull/7640), as the cache is correctly touched now, whereas previously the touch was being skipped.