Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6870

Concurrency conflicts in SampledStat

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.1.1, 2.0.0
    • Component/s: None
    • Labels:
      None

      Description

      The samples stored in SampledStat is not thread-safe. However, ReplicaFetcherThreads used to handle replica to specified brokers may update (when the samples is empty, we will add a new sample to it) and iterate the samples concurrently, and then cause the ConcurrentModificationException.

      [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
      java.util.ConcurrentModificationException
              at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
              at java.util.ArrayList$Itr.next(ArrayList.java:859)
              at org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
              at org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
              at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
              at org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
              at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
              at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
              at kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
              at kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
              at kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
              at kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
              at kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
              at kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
              at kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
              at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      

      Before https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35 the ConcurrentModificationException doesn't exist since all changes to samples is "add" currently. Using the get(index) is able to avoid the ConcurrentModificationException.

      In short, we can just make samples thread-safe. Or just replace the foreach loop by get(index) if we have concerns about the performance of thread-safe list...

       

       

       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                chia7712 Chia-Ping Tsai
                Reporter:
                chia7712 Chia-Ping Tsai
                Reviewer:
                Rajini Sivaram
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: