Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4950

ConcurrentModificationException when iterating over Kafka Metrics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.10.1.1
    • 1.0.3, 1.1.2, 2.0.1, 2.1.0
    • consumer
    • None

    Description

      It looks like the when calling PartitionStates.partitionSet(), while the resulting Hashmap is being built, the internal state of the allocations can change, which leads to ConcurrentModificationException during the copy operation.

      java.util.ConcurrentModificationException
              at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
              at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
              at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
              at java.util.HashSet.<init>(HashSet.java:119)
              at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
              at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
              at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
              at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
      
      // client code:
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.Map;
      
      import com.codahale.metrics.Gauge;
      import com.codahale.metrics.Metric;
      import com.codahale.metrics.MetricSet;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.MetricName;
      
      import static com.codahale.metrics.MetricRegistry.name;
      
      public class KafkaMetricSet implements MetricSet {
      
          private final KafkaConsumer client;
      
          public KafkaMetricSet(KafkaConsumer client) {
              this.client = client;
          }
      
          @Override
          public Map<String, Metric> getMetrics() {
              final Map<String, Metric> gauges = new HashMap<String, Metric>();
              Map<MetricName, org.apache.kafka.common.Metric> m = client.metrics();
              for (Map.Entry<MetricName, org.apache.kafka.common.Metric> e : m.entrySet()) {
                  gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), new Gauge<Double>() {
                      @Override
                      public Double getValue() {
                          return e.getValue().value(); // exception thrown here 
                      }
                  });
              }
              return Collections.unmodifiableMap(gauges);
          }
      }
      

      Attachments

        Activity

          People

            yabon Sébastien Launay
            dpostoronca Dumitru Postoronca
            Votes:
            5 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: