Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1294923) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -27,6 +27,7 @@ import kafka.producer.async._ import kafka.serializer.Encoder import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.utils.TestZKUtils class AsyncProducerTest extends JUnitSuite { @@ -54,6 +55,7 @@ props.put("port", "9092") props.put("queue.size", "10") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) @@ -92,6 +94,7 @@ props.put("port", "9092") props.put("queue.size", "10") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) @@ -130,6 +133,7 @@ props.put("queue.size", "10") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("batch.size", "5") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) @@ -168,6 +172,7 @@ props.put("queue.size", "10") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("queue.time", "200") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) @@ -200,6 +205,7 @@ asyncProducerProps.put("queue.size", "10") asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer") asyncProducerProps.put("queue.time", "100") + asyncProducerProps.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(asyncProducerProps) val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) @@ -226,6 +232,7 @@ props.put("queue.size", "50") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("batch.size", "10") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) @@ -266,6 +273,7 @@ props.put("queue.size", "50") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("batch.size", "20") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1294923) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -238,6 +238,7 @@ val props = new Properties() props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) @@ -270,6 +271,7 @@ props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) @@ -295,6 +297,7 @@ val props = new Properties() props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) try { @@ -326,6 +329,7 @@ props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) try { Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1294923) +++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy) @@ -232,6 +232,7 @@ val props = new Properties() props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) @@ -264,6 +265,7 @@ props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) @@ -289,6 +291,7 @@ val props = new Properties() props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) try { @@ -320,6 +323,7 @@ props.put("partitioner.class", "kafka.producer.NegativePartitioner") props.put("serializer.class", "kafka.producer.StringSerializer") props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) try { Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1294923) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -33,10 +33,6 @@ /* use the other constructor*/ extends Logging { private val hasShutdown = new AtomicBoolean(false) - if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) - throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") - if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) - warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled private val zkEnabled = Utils.propertyExists(config.zkConnect) Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1294923) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -29,13 +29,16 @@ * to pass in static broker and per-broker partition information. Format- * * brokerid1:host1:port1, brokerid2:host2:port2*/ val brokerList = Utils.getString(props, "broker.list", null) - if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null) + if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") /** If both broker.list and zk.connect options are specified, throw an exception */ - if(brokerList != null && zkConnect != null) + if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect)) throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") + if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList)) + throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") + /** the partitioner class for partitioning events amongst sub-topics */ val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")