Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1294302) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -33,10 +33,6 @@ /* use the other constructor*/ extends Logging { private val hasShutdown = new AtomicBoolean(false) - if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) - throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") - if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) - warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled private val zkEnabled = Utils.propertyExists(config.zkConnect) Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1294302) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -33,9 +33,12 @@ throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") /** If both broker.list and zk.connect options are specified, throw an exception */ - if(brokerList != null && zkConnect != null) + if(propertyExists(brokerList) && propertyExists(zkConnect)) throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") + if(!propertyExists(config.zkConnect) && !propertyExists(config.brokerList)) + throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") + /** the partitioner class for partitioning events amongst sub-topics */ val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")