Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6302

Topic can not be recreated after it is deleted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0
    • 1.0.1, 1.1.0
    • admin, clients
    • None

    Description

      I use an embedded kafka for unit test. My application relies on the ability to recreate topics programmatically. Currently it is not possible to re-create a topic after it has been deleted.

      // needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 'org.apache.kafka:kafka-clients:1.0.0'
      package kic.kafka.embedded
      
      import java.util.Properties
      
      import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
      import org.scalatest._
      
      import scala.collection.JavaConverters._
      
      class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
        val props = new Properties()
        val testTopic = "test-topic"
      
        "The admin client" should "be able to create, delete and re-create topics" in {
          props.setProperty("bootstrap.servers", "localhost:10001")
          props.setProperty("delete.enable.topic", "true")
          props.setProperty("group.id", "test-client")
          props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
          props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
          props.setProperty("clinet.id", "test-client")
          props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
          props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      
          EmbeddedKafaJavaWrapper.start(10001, 10002, props)
      
          try {
            implicit val admin = AdminClient.create(props)
      
            // create topic and confirm it exists
            createTopic(testTopic)
            val topics = listTopics()
            info(s"topics: $topics")
            topics should contain(testTopic)
      
            // now we should be able to send something to this topic
            // TODO create producer and send something
      
            // delete topic
            deleteTopic(testTopic)
            listTopics() shouldNot contain(testTopic)
      
            // recreate topic
            createTopic(testTopic)
            // listTopics() should contain(testTopic)
      
            // and finally consume from the topic and expect to get 0 entries
            // TODO create consumer and poll once
          } finally {
            EmbeddedKafaJavaWrapper.stop()
          }
      
        }
      
        def listTopics()(implicit admin: AdminClient) =
          admin.listTopics().names().get()
      
        def createTopic(topic: String)(implicit admin: AdminClient) =
          admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
      
        def deleteTopic(topic: String)(implicit admin: AdminClient) =
          admin.deleteTopics(Seq("test-topic").asJava).all().get()
      
      }
      

      Btw, what happens to connected producers/consumers when I delete a topic?

      Attachments

        Activity

          People

            mjsax Matthias J. Sax
            kic kic
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: