Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9999

Internal topic creation failure should be non-fatal and trigger explicit rebalance

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • 2.4.0
    • None
    • admin, streams
    • None

    Description

      We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it:

       

      [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably marked for deletion (number of partitions is unknown).
      Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
      Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
      [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown).
      Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
      Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) 
      [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown).
      Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
      Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
      [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager)
      [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
      [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
      org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.
              at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
              at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
              at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) 
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) 
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
              at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
              at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
              at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) 
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
              at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
              at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
              at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)

      Looking closer it seems that we don't know as of today if a topic is pending deletion or running properly. We could discuss a follow-up effort to reflect that information as part of topic description result.

      The current solution to this problem is to explicitly trigger a rebalance when we run out of retries to unblock the group, as the short term unavailability should be more likely a broker side unavailability.

       

      Attachments

        Issue Links

          Activity

            People

              bchen225242 Boyang Chen
              bchen225242 Boyang Chen
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: