From 36df92333e460146da8290e09094d940e456b25c Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 17:52:04 -0700 Subject: [PATCH 1/4] KAFKA-2381: Possible ConcurrentModificationException while unsubscribing from a topic in new consumer --- .../clients/consumer/internals/SubscriptionState.java | 3 ++- .../test/scala/integration/kafka/api/ConsumerTest.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4d9a425..89b5fb2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -83,7 +83,8 @@ public class SubscriptionState { throw new IllegalStateException("Topic " + topic + " was never subscribed to."); this.subscribedTopics.remove(topic); this.needsPartitionAssignment = true; - for (TopicPartition tp: assignedPartitions()) + final Set existingAssignedPartitions = new HashSet<>(assignedPartitions()); + for (TopicPartition tp: existingAssignedPartitions) if (topic.equals(tp.topic())) clearPartition(tp); } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..f20e453 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -217,6 +217,22 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.close() } + def testUnsubscribeTopic() { + val callback = new TestConsumerReassignmentCallback() + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumer0.subscribe(topic) + + // the initial subscription should cause a callback execution + while(callback.callsToAssigned == 0) + consumer0.poll(50) + + consumer0.unsubscribe(topic) + assertEquals(0, consumer0.subscriptions.size()) + + consumer0.close() + } + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { var callsToAssigned = 0 var callsToRevoked = 0 -- 2.3.2 (Apple Git-55) From 1ac6061d736854738306c9dc436148c0c498b8a0 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 21:55:53 -0700 Subject: [PATCH 2/4] Add a more specific unit test --- .../consumer/internals/SubscriptionStateTest.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 319751c..c47f3fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -73,6 +73,27 @@ public class SubscriptionStateTest { assertAllPositions(tp0, null); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); } + + @Test + public void topicUnsubscription() { + final String topic = "test"; + state.subscribe(topic); + assertEquals(1, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + assertTrue(state.partitionsAutoAssigned()); + state.changePartitionAssignment(asList(tp0)); + state.committed(tp0, 1); + state.fetched(tp0, 1); + state.consumed(tp0, 1); + assertAllPositions(tp0, 1L); + state.changePartitionAssignment(asList(tp1)); + assertAllPositions(tp0, null); + assertEquals(Collections.singleton(tp1), state.assignedPartitions()); + + state.unsubscribe(topic); + assertEquals(0, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + } @Test(expected = IllegalArgumentException.class) public void cantChangeFetchPositionForNonAssignedPartition() { -- 2.3.2 (Apple Git-55) From 98c07409f0ed3b5cf0f60785e420b411ded98f65 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Jul 2015 21:59:38 -0700 Subject: [PATCH 3/4] Move closing of consumer to finally --- .../test/scala/integration/kafka/api/ConsumerTest.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index f20e453..cca6e94 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -221,16 +221,19 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - consumer0.subscribe(topic) - // the initial subscription should cause a callback execution - while(callback.callsToAssigned == 0) - consumer0.poll(50) + try { + consumer0.subscribe(topic) - consumer0.unsubscribe(topic) - assertEquals(0, consumer0.subscriptions.size()) + // the initial subscription should cause a callback execution + while (callback.callsToAssigned == 0) + consumer0.poll(50) - consumer0.close() + consumer0.unsubscribe(topic) + assertEquals(0, consumer0.subscriptions.size()) + } finally { + consumer0.close() + } } private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { -- 2.3.2 (Apple Git-55) From 63f6fbe8efda56965655a7fbc2629fed294b0cb4 Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 28 Jul 2015 09:17:28 -0700 Subject: [PATCH 4/4] Address review comment --- .../org/apache/kafka/clients/consumer/internals/SubscriptionState.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 89b5fb2..8a2cb12 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,7 +84,7 @@ public class SubscriptionState { throw new IllegalStateException("Topic " + topic + " was never subscribed to."); this.subscribedTopics.remove(topic); this.needsPartitionAssignment = true; - final Set existingAssignedPartitions = new HashSet<>(assignedPartitions()); + final List existingAssignedPartitions = new ArrayList<>(assignedPartitions()); for (TopicPartition tp: existingAssignedPartitions) if (topic.equals(tp.topic())) clearPartition(tp); -- 2.3.2 (Apple Git-55)