Description
I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown.
[2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145) at org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197) at org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725) at kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616) at kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at junit.framework.TestCase.runTest(TestCase.java:168) at junit.framework.TestCase.runBare(TestCase.java:134) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:124) at junit.framework.TestSuite.runTest(TestSuite.java:232) at junit.framework.TestSuite.run(TestSuite.java:227) at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) at org.scalatest.tools.Runner$.run(Runner.scala:883) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Attachments
Issue Links
- blocks
-
KAFKA-1893 Allow regex subscriptions in the new consumer
- Resolved