Description
In our system we programmatically create the topic when data is first received from a data source prior to producing messages to that topic. Our Kafka producers run in a highly concurrent distributed environment (Storm), such that when a data feed is first enabled there may be multiple Storm bolt processes competing to create the topic. Occasionally, we see an InvalidTopicException for a topic colliding with itself due to a race condition where the initial zkUtils.zkClient.exists(topic) check returns false, but the subsequent zkUtils.getAllTopics() call includes the topic.
Example stack trace,
Caused by: kafka.common.InvalidTopicException: Topic "DATA.7b3fbbd4-4ecd-4521-aed4-0ccca371e573" collides with existing topics: DATA.7b3fbbd4-4ecd-4521-aed4-0ccca371e573 at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:258) ~[stormjar.jar:?] at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237) ~[stormjar.jar:?] at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) ~[stormjar.jar:?]
To fix this I believe we just need to update the Topic#hasCollision method to disallow a topic from having a collision with itself (as KAFKA-2337 clearly did not intend for that condition to ever be possible), like this:
def hasCollision(topicA: String, topicB: String): Boolean = { topicA != topicB && topicA.replace('.', '_') == topicB.replace('.', '_') }
This bug affects version 0.9.0.0 and higher.
Attachments
Issue Links
- links to