Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9534

Topics could not be deleted when there is a concurrent create topic request loop

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.3.0, 2.5.0
    • Fix Version/s: None
    • Component/s: admin
    • Labels:
      None

      Description

      The reproduce steps:

      1. start local ZK
      2. start local broker
      3.  Run the following script which keeps creating an input topic until success:

       

      package kafka.examples;
      
      import org.apache.kafka.clients.admin.Admin;
      import org.apache.kafka.clients.admin.NewTopic;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.errors.TopicExistsException;
      
      import java.util.Arrays;
      import java.util.List;
      import java.util.Properties;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutionException;
      
      public class Reproduce {
      
          public static void main(String[] args) throws ExecutionException, InterruptedException {
              Properties props = new Properties();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
      
              Admin adminClient = Admin.create(props);
      
              createTopics(adminClient);
              CountDownLatch createSucceed = new CountDownLatch(1);
              Thread deleteTopicThread = new Thread(() -> {
                  List<String> topicsToDelete = Arrays.asList("input-topic", "output-topic");
                  while (true) {
                      try {
                          Thread.sleep(1000);
                          adminClient.deleteTopics(topicsToDelete).all().get();
                          if (createSucceed.getCount() == 0) {
                              break;
                          }
                      } catch (ExecutionException | InterruptedException e) {
                          System.out.println("Encountered exception during topic deletion: " + e.getCause());
                      }
                  }
                  System.out.println("Deleted old topics: " + topicsToDelete);
               });
              deleteTopicThread.start();
      
              while (true) {
                  try {
                      createTopics(adminClient);
                      System.out.println("Created new topic!");
                      break;
                  } catch (ExecutionException | InterruptedException e) {
                      if (!(e.getCause() instanceof TopicExistsException)) {
                          throw e;
                      }
                      System.out.println("Metadata of the old topics are not cleared yet... " + e.getMessage());
      
                      Thread.sleep(1000);
                  }
              }
              createSucceed.countDown();
              deleteTopicThread.join();
          }
      
          private static void createTopics(Admin adminClient) throws InterruptedException, ExecutionException {
              adminClient.createTopics(Arrays.asList(
                  new NewTopic("input-topic", 1, (short) 1),
                  new NewTopic("output-topic", 1, (short) 1))).all().get();
          }
      }
      
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              bchen225242 Boyang Chen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: