Description
Currently, when creating topics via ZooKeeper, there is a small but definite delay between creating the nodes in ZK, and having the topics created in the brokers. the KafkaProducer maintains a metadata cache about topics which get updated after the broker metadata is updated. If an application adds partitions to a topic, and immediately tries to produce records to a new partition, a KafkaException is throw with a message similar to the following:
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with record: 12 is not in the range [0...1).
In this case, since the application has context that it created the topics, it might be worthwhile to consider if a more specific exception can be thrown instead of KafkaException. For example:
public class PartitionNotFoundException extends KafkaException {...}
This could allow the application to be able to interpret such an error, and act accordingly.
EDIT: Correct "create topics" to "adds partitions to a topic".