Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4950

ConcurrentModificationException when iterating over Kafka Metrics

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.10.1.1
    • Fix Version/s: 1.0.3, 1.1.2, 2.0.1, 2.1.0
    • Component/s: consumer
    • Labels:
      None

      Description

      It looks like the when calling PartitionStates.partitionSet(), while the resulting Hashmap is being built, the internal state of the allocations can change, which leads to ConcurrentModificationException during the copy operation.

      java.util.ConcurrentModificationException
              at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
              at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
              at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
              at java.util.HashSet.<init>(HashSet.java:119)
              at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
              at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
              at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
              at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
      
      // client code:
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.Map;
      
      import com.codahale.metrics.Gauge;
      import com.codahale.metrics.Metric;
      import com.codahale.metrics.MetricSet;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.MetricName;
      
      import static com.codahale.metrics.MetricRegistry.name;
      
      public class KafkaMetricSet implements MetricSet {
      
          private final KafkaConsumer client;
      
          public KafkaMetricSet(KafkaConsumer client) {
              this.client = client;
          }
      
          @Override
          public Map<String, Metric> getMetrics() {
              final Map<String, Metric> gauges = new HashMap<String, Metric>();
              Map<MetricName, org.apache.kafka.common.Metric> m = client.metrics();
              for (Map.Entry<MetricName, org.apache.kafka.common.Metric> e : m.entrySet()) {
                  gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), new Gauge<Double>() {
                      @Override
                      public Double getValue() {
                          return e.getValue().value(); // exception thrown here 
                      }
                  });
              }
              return Collections.unmodifiableMap(gauges);
          }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              yabon S├ębastien Launay
              Reporter:
              dpostoronca Dumitru Postoronca
            • Votes:
              5 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: