Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8370

Kafka Connect should check for existence of internal topics before attempting to create them



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • None
    • KafkaConnect
    • None


      The Connect worker doesn't current check for the existence of the internal topics, and instead is issuing a CreateTopic request and handling a TopicExistsException. However, this can cause problems when the number of brokers is fewer than the replication factor, even if the topic already exists and the partitions of those topics all remain available on the remaining brokers.

      One problem of the current approach is that the broker checks the requested replication factor before checking for the existence of the topic, resulting in unexpected exceptions when the topic does exist:

      connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
      connect      | org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'connect-offsets'
      connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
      connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
      connect      | 	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
      connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
      connect      | 	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
      connect      | 	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
      connect      | 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
      connect      | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      connect      | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      connect      | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      connect      | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      connect      | 	at java.lang.Thread.run(Thread.java:748)
      connect      | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
      connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
      connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
      connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
      connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
      connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
      connect      | 	... 11 more
      connect      | Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
      connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect)

      Instead of always issuing a CreateTopic request, the worker's admin client should first check whether the topic exists, and if not then attempt to create the topic.


        Issue Links



              rhauch Randall Hauch
              rhauch Randall Hauch
              0 Vote for this issue
              3 Start watching this issue