Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14616

Topic recreation with offline broker causes permanent URPs



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.1
    • 3.7.0
    • kraft
    • None


      We are facing an odd situation when we delete and recreate a topic while broker is offline in KRAFT mode.
      Here’s what we saw step by step

      1. Created topic foo.test with 10 partitions and 4 replicas — Topic foo.test was created with topic ID MfuZbwdmSMaiSa0g6__TPg
      2. Took broker 4 offline — which held replicas for partitions  0, 3, 4, 5, 7, 8, 9
      3. Deleted topic foo.test — The deletion process was successful, despite the fact that broker 4 still held replicas for partitions 0, 3, 4, 5, 7, 8, 9 on local disk.
      4. Recreated topic foo.test with 10 partitions and 4 replicas. — Topic foo.test was created with topic ID RzalpqQ9Q7ub2M2afHxY4Q and partitions 0, 1, 2, 7, 8, 9 got assigned to broker 4 (which was still offline). Notice here that partitions 0, 7, 8, 9 are common between the assignment of the deleted topic (topic_id: MfuZbwdmSMaiSa0g6__TPg) and the recreated topic (topic_id: RzalpqQ9Q7ub2M2afHxY4Q).
      5. Brough broker 4 back online.
      6. Broker started to create new partition replicas for the recreated topic foo.test (topic_id: RzalpqQ9Q7ub2M2afHxY4Q)
      7. The broker hit the following error Tried to assign topic ID RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already contained topic ID MfuZbwdmSMaiSa0g6__TPg . As a result of this error the broker decided to rename log dir for partitions 0, 3, 4, 5, 7, 8, 9 to <topic>-<partition>.<uuid>-delete.
      8. Ran ls <log.dir_path>

            9. Waited until the deletion of the old topic was done and ran ls <log.dir_path> again, now we were expecting to see log dir for partitions 0, 1, 2, 7, 8, 9 however the result is:


           10. Ran kafka-topics.sh --command-config cmd.properties --bootstrap-server <bootstrap> --describe --topic foo.test

      Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 ReplicationFactor: 4 Configs: min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=1000000000
      Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
      Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
      Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
      Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
      Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
      Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
      Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
      Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
      Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
      Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6

      Here’s a sample of broker logs


      {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted log for partition foo.test-9 in /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
      {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted time index /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.timeindex.deleted.","logger":"kafka.log.LogSegment"}
      {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted offset index /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.index.deleted.","logger":"kafka.log.LogSegment"}
      {"timestamp":"2023-01-11T15:19:53,615Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted log /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.log.deleted.","logger":"kafka.log.LogSegment"}
      {"timestamp":"2023-01-11T15:19:53,614Z","level":"INFO","thread":"kafka-scheduler-8","message":"[LocalLog partition=foo.test-9, dir=/kafka/d1/data] Deleting segment files LogSegment(baseOffset=0, size=0, lastModifiedTime=1673439574661, largestRecordTimestamp=None)","logger":"kafka.log.LocalLog$"}
      {"timestamp":"2023-01-11T15:19:53,612Z","level":"INFO","thread":"kafka-scheduler-8","message":"[LocalLog partition=foo.test-9, dir=/kafka/d1/data] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=0, lastModifiedTime=1673439574661, largestRecordTimestamp=None)","logger":"kafka.log.LocalLog"}
      {"timestamp":"2023-01-11T15:18:53,611Z","level":"INFO","thread":"EventHandler","message":"Log for partition foo.test-9 is renamed to /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete and is scheduled for deletion","logger":"kafka.log.LogManager"}
      {"timestamp":"2023-01-11T15:18:53,596Z","level":"INFO","thread":"EventHandler","message":"Found stray log dir Log(dir=/kafka/d1/data/foo.test-4, topicId=MfuZbwdmSMaiSa0g6__TPg, topic=foo.test, partition=4, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0): the topicId MfuZbwdmSMaiSa0g6__TPg does not exist in the metadata image","logger":"kafka.server.metadata.BrokerMetadataPublisher$"}
      {"timestamp":"2023-01-11T15:18:51,578Z","level":"ERROR","thread":"EventHandler","message":"[Broker id=4] Unable to start fetching foo.test-9 with topic ID RzalpqQ9Q7ub2M2afHxY4Q due to InconsistentTopicIdException","logger":"state.change.logger","throwable":{"class":"org.apache.kafka.common.errors.InconsistentTopicIdException","msg":"Tried to assign topic ID RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already contained topic ID MfuZbwdmSMaiSa0g6__TPg"}}
      {"timestamp":"2023-01-11T15:18:51,577Z","level":"INFO","thread":"EventHandler","message":"[Broker id=4] Creating new partition foo.test-9 with topic id RzalpqQ9Q7ub2M2afHxY4Q.","logger":"state.change.logger"}
      {"timestamp":"2023-01-11T15:18:51,465Z","level":"INFO","thread":"log-recovery-/kafka/d1/data-0","message":"Completed load of Log(dir=/kafka/d1/data/foo.test-9, topicId=MfuZbwdmSMaiSa0g6__TPg, topic=foo.test, partition=9, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0) with 1 segments in 4ms (41/46 completed in /kafka/d1/data)","logger":"kafka.log.LogManager"}
      {"timestamp":"2023-01-11T15:18:51,464Z","level":"INFO","thread":"log-recovery-/kafka/d1/data-0","message":"[LogLoader partition=foo.test-9, dir=/kafka/d1/data] Loading producer state till offset 0 with message format version 2","logger":"kafka.log.UnifiedLog$"}

      So what we noticed here is that KRaft didn’t recreate the log dir for partitions 0, 7, 8, 9 for the recreated topic after it deleted them for the deleted topic.

      There're few behaviours here that are different than ZK mode (to the extent of our knowledge):

      1. In ZK mode deleting a topic is a blocking step for recreating the topic, so in our case in ZK mode, the deletion of foo.test will be waiting for broker 4 to recover first, complete the deletion then a user can recreate the topic. (KIP-516 proposed changing the log.dir layout from <topic><partition> format to `topicIdprefix/<topicId><partition>` but this has not been done yet".). While KRaft is fire and forget.
      2. In ZK mode, creating a new topic doesn't assign partitions to non-active brokers. In the scenario we faced, the behavior of KRaft seems to be different. KRaft assigned partitions to broker 4 while broker 4 was down.


        Issue Links



              cmccabe Colin McCabe
              omnia_h_ibrahim Omnia Ibrahim
              0 Vote for this issue
              5 Start watching this issue