From 36df92333e460146da8290e09094d940e456b25c Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 17:52:04 -0700 Subject: [PATCH 1/2] KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer --- .../clients/consumer/internals/SubscriptionState.java | 3 ++- .../test/scala/integration/kafka/api/ConsumerTest.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4d9a425..89b5fb2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -83,7 +83,8 @@ public class SubscriptionState { throw new IllegalStateException("Topic " + topic + " was never subscribed to."); this.subscribedTopics.remove(topic); this.needsPartitionAssignment = true; - for (TopicPartition tp: assignedPartitions()) + final Set existingAssignedPartitions = new HashSet<>(assignedPartitions()); + for (TopicPartition tp: existingAssignedPartitions) if (topic.equals(tp.topic())) clearPartition(tp); } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..f20e453 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -217,6 +217,22 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.close() } + def testUnsubscribeTopic() { + val callback = new TestConsumerReassignmentCallback() + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumer0.subscribe(topic) + + // the initial subscription should cause a callback execution + while(callback.callsToAssigned == 0) + consumer0.poll(50) + + consumer0.unsubscribe(topic) + assertEquals(0, consumer0.subscriptions.size()) + + consumer0.close() + } + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { var callsToAssigned = 0 var callsToRevoked = 0 -- 2.3.2 (Apple Git-55) From 1ac6061d736854738306c9dc436148c0c498b8a0 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 21:55:53 -0700 Subject: [PATCH 2/2] Add a more specific unit test --- .../consumer/internals/SubscriptionStateTest.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 319751c..c47f3fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -73,6 +73,27 @@ public class SubscriptionStateTest { assertAllPositions(tp0, null); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); } + + @Test + public void topicUnsubscription() { + final String topic = "test"; + state.subscribe(topic); + assertEquals(1, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + assertTrue(state.partitionsAutoAssigned()); + state.changePartitionAssignment(asList(tp0)); + state.committed(tp0, 1); + state.fetched(tp0, 1); + state.consumed(tp0, 1); + assertAllPositions(tp0, 1L); + state.changePartitionAssignment(asList(tp1)); + assertAllPositions(tp0, null); + assertEquals(Collections.singleton(tp1), state.assignedPartitions()); + + state.unsubscribe(topic); + assertEquals(0, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + } @Test(expected = IllegalArgumentException.class) public void cantChangeFetchPositionForNonAssignedPartition() { -- 2.3.2 (Apple Git-55)