From 303d8dde0e829769741228bb5a592e31875e9ba7 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 17:52:04 -0700 Subject: [PATCH] KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer --- .../clients/consumer/internals/SubscriptionState.java | 3 ++- .../test/scala/integration/kafka/api/ConsumerTest.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4d9a425..89b5fb2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -83,7 +83,8 @@ public class SubscriptionState { throw new IllegalStateException("Topic " + topic + " was never subscribed to."); this.subscribedTopics.remove(topic); this.needsPartitionAssignment = true; - for (TopicPartition tp: assignedPartitions()) + final Set existingAssignedPartitions = new HashSet<>(assignedPartitions()); + for (TopicPartition tp: existingAssignedPartitions) if (topic.equals(tp.topic())) clearPartition(tp); } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..ebffe64 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -217,6 +217,22 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.close() } + def testUnsubscribingFromTopic() { + val callback = new TestConsumerReassignmentCallback() + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumer0.subscribe(topic) + + // the initial subscription should cause a callback execution + while(callback.callsToAssigned == 0) + consumer0.poll(50) + + consumer0.unsubscribe(topic) + assertEquals(0, consumer0.subscriptions.size()) + + consumer0.close() + } + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { var callsToAssigned = 0 var callsToRevoked = 0 -- 2.3.2 (Apple Git-55)