diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 839fd27..a993e8c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -63,18 +63,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private val topic1 = "topic-1" private val topic2 = "topic-2" - // TODO: move this function to TestUtils after we have server dependant on clients - private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long, - blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = { - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - return new KafkaProducer(producerProps) - } - override def setUp() { super.setUp() server1 = TestUtils.createServer(config1) @@ -85,10 +73,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") - producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0 - producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1 - producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1 - producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize); // produce with incorrect broker list + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize); + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize) + // producer with incorrect broker list + producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) } override def tearDown() { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 2230333..af11a49 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -90,9 +90,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testSendOffset() { - val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - var producer = new KafkaProducer(props) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) val callback = new CheckErrorCallback @@ -148,9 +146,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testClose() { - val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - var producer = new KafkaProducer(props) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -186,10 +182,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testSendToPartition() { - val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props.put(ProducerConfig.ACKS_CONFIG, "-1") - var producer = new KafkaProducer(props) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -244,9 +237,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testAutoCreateTopic() { - val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - var producer = new KafkaProducer(props) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + retries = 5) try { // Send a message to auto-create the topic diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 49c7790..00bfba4 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -42,6 +42,7 @@ import kafka.producer.ProducerConfig import junit.framework.AssertionFailedError import junit.framework.Assert._ +import org.apache.kafka.clients.producer.KafkaProducer /** * Utility functions to help with testing @@ -355,6 +356,28 @@ object TestUtils extends Logging { } /** + * Create a (new) producer with a few pre-configured properties. + */ + def createNewProducer(brokerList: String, + acks: Int = -1, + metadataFetchTimeout: Long = 3000L, + blockOnBufferFull: Boolean = true, + bufferSize: Long = 1024L * 1024L, + retries: Int = 0) : KafkaProducer = { + import org.apache.kafka.clients.producer.ProducerConfig + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000") + return new KafkaProducer(producerProps) + } + + /** * Create a default producer config properties map with the given metadata broker list */ def getProducerConfig(brokerList: String): Properties = {