Details
Description
It looks like the when calling PartitionStates.partitionSet(), while the resulting Hashmap is being built, the internal state of the allocations can change, which leads to ConcurrentModificationException during the copy operation.
java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at java.util.HashSet.<init>(HashSet.java:119) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
// client code: import java.util.Collections; import java.util.HashMap; import java.util.Map; import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.MetricName; import static com.codahale.metrics.MetricRegistry.name; public class KafkaMetricSet implements MetricSet { private final KafkaConsumer client; public KafkaMetricSet(KafkaConsumer client) { this.client = client; } @Override public Map<String, Metric> getMetrics() { final Map<String, Metric> gauges = new HashMap<String, Metric>(); Map<MetricName, org.apache.kafka.common.Metric> m = client.metrics(); for (Map.Entry<MetricName, org.apache.kafka.common.Metric> e : m.entrySet()) { gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), new Gauge<Double>() { @Override public Double getValue() { return e.getValue().value(); // exception thrown here } }); } return Collections.unmodifiableMap(gauges); } }