Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8736

Performance: ThreadCache uses size() for empty cache check

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.3.0
    • 2.4.0, 2.3.1
    • streams
    • None

    Description

      While load testing Kafka Streams in 2.3.0, we stumbled across a potential performance improvement. The test showed we were spending 80% of CPU time in ConcurrentSkipListMap.size():

       

      100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
      100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
      96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
      96.84% org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
      96.83% org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
      96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
      96.3% org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object, java.lang.Object):87
      96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object):133
      96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object, org.apache.kafka.streams.processor.To):180
      96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, java.lang.Object, java.lang.Object):201
      96.23% org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, java.lang.Object):117
      96.12% org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object, java.lang.Object):43
      96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object):133
      96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object, org.apache.kafka.streams.processor.To):180
      96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, java.lang.Object, java.lang.Object):201
      96.08% org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, java.lang.Object):117
      82.78% org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object, java.lang.Object):169
      82.78% org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):612
      82.59% org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):127
      81.11% org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):35
      81.09% org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, byte[]):131
      81.09% org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, org.apache.kafka.common.utils.Bytes, org.apache.kafka.streams.state.internals.LRUCacheEntry):151
      80.53% org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
      80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
      80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639

      According to https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--, the size method has to traverse all elements to get a count. It looks like the count is being compared against 0 to determine if the map is empty; In this case, we don't need a full count. Instead, the isEmpty() method should be used, which just looks for one node. We patched this and gained about 25% max throughput, and this method disappeared from thread dumps as a hot spot.

      Update:

      The root cause is an internal change from `TreeMap` to `ConcurrentSkipListMap`. In `TreeMap` using `size()` does not harm performance, because is has constant time runtime. Hence, it is a regression that only affects 2.3.0 release.

      Attachments

        1. size.patch
          2 kB
          Matthew Jarvie

        Activity

          People

            MJarvie Matthew Jarvie
            MJarvie Matthew Jarvie
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: