Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9401

High lock contention for kafka.server.FetchManager.newContext

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.8.0
    • core
    • None

    Description

      kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock.

      I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile.

      *— 25818577497 ns (20.84%), 5805 samples
       [ 0] kafka.server.FetchSessionCache
       [ 1] kafka.server.FetchManager.newContext
       [ 2] kafka.server.KafkaApis.handleFetchRequest
       [ 3] kafka.server.KafkaApis.handle
       [ 4] kafka.server.KafkaRequestHandler.run
       [ 5] java.lang.Thread.run
       

      FetchSession.scala:

        cache.synchronized {
          cache.get(reqMetadata.sessionId) match {
            case None => {
              debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
              new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
            }
            case Some(session) => session.synchronized {
              if (session.epoch != reqMetadata.epoch) {
                debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
                  s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
                new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
              } else {
                val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
                if (session.isEmpty) {
                  debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " +
                    s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " +
                    s"there are no more partitions left.")
                  cache.remove(session)
                  new SessionlessFetchContext(fetchData)
                } else {
                  cache.touch(session, time.milliseconds())
                  session.epoch = JFetchMetadata.nextEpoch(session.epoch)
                  debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
                    s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
                    s"updated ${partitionsToLogString(updated)}, " +
                    s"removed ${partitionsToLogString(removed)}")
                  new IncrementalFetchContext(time, reqMetadata, session)
                }
              }
            }
          }
        }
      
      

      Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" (https://github.com/apache/kafka/pull/7640), as the cache is correctly touched now, whereas previously the touch was being skipped.

       

      Attachments

        Activity

          People

            gnarula Gaurav Narula
            lucasbradstreet Lucas Bradstreet
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: