Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6063

StreamsException is thrown after the changing `partitions`

    Details

    • Type: Bug
    • Status: Open
    • Priority: Trivial
    • Resolution: Unresolved
    • Affects Version/s: 0.11.0.0
    • Fix Version/s: None
    • Component/s: streams
    • Labels:
    • Environment:
      macOS 10.12
      kafka 0.11.0.1

      Description

      Hi.
      "org.apache.kafka.streams.errors.StreamsException" is thrown in following case.

      Create topic

      $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic word-count-input
      

      Create Kafka Streams Application

      public class WordCountApp {
          public static void main(String[] args) {
              Properties config = new Properties();
              config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
      ...
      ...
      

      Ensure that it works fine

      $ java -jar wordcount.jar
      
      KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
              StreamsThread appId: wordcount-application
      ...
      ...
      

      Change "partitions"

      $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic word-count-input
      Adding partitions succeeded!
      

      When I start Application, StreamsException is thrown

      $ java -jar wordcount.jar
      
      KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
              StreamsThread appId: wordcount-applicationn
                      StreamsThread clientId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522
                      StreamsThread threadId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1
                      Active tasks:
                              Running:
                              Suspended:
                              Restoring:
                              New:
                      Standby tasks:
                              Running:
                              Suspended:
                              Restoring:
                              New:
      
      
      Exception in thread "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
      	at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
      	at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
      	at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
      	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
      

      If I change the application id, Application works again.

      Thank you.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Akihito Akihito Nakano
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: