diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 40a25a2..96fa0bd 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -343,12 +343,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d-%d".format(config.brokerId, partition) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") props.put("compression.codec", compression.codec.toString) - props.put("key.serializer.class", classOf[IntEncoder].getName.toString) - props.put("serializer.class", classOf[StringEncoder].getName.toString) - val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) + val producer: Producer[Int, String] = + createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) + val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition)) @@ -363,11 +365,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar numParts: Int): List[String]= { var messages: List[String] = Nil val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("key.serializer.class", classOf[IntEncoder].getName.toString) - props.put("serializer.class", classOf[StringEncoder].getName) - val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) + props.put("compression.codec", compression.codec.toString) + val producer: Producer[Int, String] = + createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) + for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) @@ -378,14 +383,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[String]= { - var messages: List[String] = Nil - for(conf <- configs) - messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts) - messages - } - - def getMessages(nMessagesPerThread: Int, + def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= { var messages: List[String] = Nil for((topic, messageStreams) <- topicMessageStreams) { diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 97e3b14..9f04bd3 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} +import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness @@ -29,7 +29,9 @@ import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.AdminUtils import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} -import kafka.utils.{TestUtils, Utils} +import kafka.utils.{StaticPartitioner, TestUtils, Utils} +import kafka.serializer.StringEncoder +import java.util.Properties /** * End to end tests of the primitive apis against a local server @@ -67,11 +69,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" - val props = producer.config.props.props - val config = new ProducerConfig(props) - val stringProducer1 = new Producer[String, String](config) - stringProducer1.send(new KeyedMessage[String, String](topic, "test-message")) + producer.send(new KeyedMessage[String, String](topic, "test-message")) val replica = servers.head.replicaManager.getReplica(topic, 0).get assertTrue("HighWatermark should equal logEndOffset with just 1 replica", @@ -93,11 +92,16 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" - val props = producer.config.props.props + val props = new Properties() props.put("compression.codec", "gzip") - val config = new ProducerConfig(props) - val stringProducer1 = new Producer[String, String](config) + val stringProducer1 = TestUtils.createProducer[String, String]( + TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props) + stringProducer1.send(new KeyedMessage[String, String](topic, "test-message")) val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) @@ -172,10 +176,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } def testProduceAndMultiFetch() { - val props = producer.config.props.props - val config = new ProducerConfig(props) - val noCompressionProducer = new Producer[String, String](config) - produceAndMultiFetch(noCompressionProducer) + produceAndMultiFetch(producer) } private def multiProduce(producer: Producer[String, String]) { @@ -201,10 +202,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } def testMultiProduce() { - val props = producer.config.props.props - val config = new ProducerConfig(props) - val noCompressionProducer = new Producer[String, String](config) - multiProduce(noCompressionProducer) + multiProduce(producer) } def testConsumerEmptyTopic() { @@ -218,9 +216,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testPipelinedProduceRequests() { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) createSimpleTopicsAndAwaitLeader(zkClient, topics.keys) - val props = producer.config.props.props + val props = new Properties() props.put("request.required.acks", "0") - val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props)) + val pipelinedProducer: Producer[String, String] = + TestUtils.createProducer[String, String]( + TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props) // send some messages val messages = new mutable.HashMap[String, Seq[String]] diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6c3feac..906600c 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -84,15 +84,11 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testProduceAfterClosed() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("producer.type", "async") - props.put("batch.num.messages", "1") - - val config = new ProducerConfig(props) val produceData = getProduceData(10) - val producer = new Producer[String, String](config) + val producer = createProducer[String, String]( + getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName) + producer.close try { @@ -303,10 +299,14 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testIncompatibleEncoder() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val config = new ProducerConfig(props) + // no need to retry since the send will always fail + props.put("message.send.max.retries", "0") + val producer= createProducer[String, String]( + brokerList = getBrokerListStrFromConfigs(configs), + encoder = classOf[DefaultEncoder].getName, + keyEncoder = classOf[DefaultEncoder].getName, + producerProps = props) - val producer=new Producer[String, String](config) try { producer.send(getProduceData(1): _*) fail("Should fail with ClassCastException due to incompatible Encoder") diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index c3da69d..b61c0b8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -34,7 +34,7 @@ import org.junit.Assert.assertTrue import org.junit.Assert.assertFalse import org.junit.Assert.assertEquals import kafka.common.{ErrorMapping, FailedToSendMessageException} - +import kafka.serializer.StringEncoder class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val brokerId1 = 0 @@ -94,26 +94,30 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) - val props1 = new util.Properties() - props1.put("metadata.broker.list", "localhost:80,localhost:81") - props1.put("serializer.class", "kafka.serializer.StringEncoder") - val producerConfig1 = new ProducerConfig(props1) - val producer1 = new Producer[String, String](producerConfig1) + val props = new Properties() + // no need to retry since the send will always fail + props.put("message.send.max.retries", "0") + val producer1 = TestUtils.createProducer[String, String]( + brokerList = "localhost:80,localhost:81", + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + producerProps = props) + try{ producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) fail("Test should fail because the broker list provided are not valid") } catch { - case e: FailedToSendMessageException => + case e: FailedToSendMessageException => // this is expected case oe: Throwable => fail("fails with exception", oe) } finally { producer1.close() } - val props2 = new util.Properties() - props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) - props2.put("serializer.class", "kafka.serializer.StringEncoder") - val producerConfig2= new ProducerConfig(props2) - val producer2 = new Producer[String, String](producerConfig2) + val producer2 = TestUtils.createProducer[String, String]( + brokerList = "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq(config1)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName) + try{ producer2.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { @@ -122,11 +126,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ producer2.close() } - val props3 = new util.Properties() - props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props3.put("serializer.class", "kafka.serializer.StringEncoder") - val producerConfig3 = new ProducerConfig(props3) - val producer3 = new Producer[String, String](producerConfig3) + val producer3 = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName) + try{ producer3.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { @@ -139,26 +143,19 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendToNewTopic() { val props1 = new util.Properties() - props1.put("serializer.class", "kafka.serializer.StringEncoder") - props1.put("partitioner.class", "kafka.utils.StaticPartitioner") - props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props1.put("request.required.acks", "2") - props1.put("request.timeout.ms", "1000") - - val props2 = new util.Properties() - props2.putAll(props1) - props2.put("request.required.acks", "3") - props2.put("request.timeout.ms", "1000") - - val producerConfig1 = new ProducerConfig(props1) - val producerConfig2 = new ProducerConfig(props2) val topic = "new-topic" // create topic with 1 partition and await leadership TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) - val producer1 = new Producer[String, String](producerConfig1) - val producer2 = new Producer[String, String](producerConfig2) + val producer1 = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props1) + // Available partition ids should be 0. producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) producer1.send(new KeyedMessage[String, String](topic, "test", "test2")) @@ -179,6 +176,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ assertEquals(new Message(bytes = "test2".getBytes, key = "test".getBytes), messageSet(1).message) producer1.close() + val props2 = new util.Properties() + props2.put("request.required.acks", "3") + // no need to retry since the send will always fail + props2.put("message.send.max.retries", "0") + + val producer2 = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props2) + try { producer2.send(new KeyedMessage[String, String](topic, "test", "test2")) fail("Should have timed out for 3 acks.") @@ -197,19 +206,23 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendWithDeadBroker() { val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("request.timeout.ms", "2000") props.put("request.required.acks", "1") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + // No need to retry since the topic will be created beforehand and normal send will succeed on the first try. + // Reducing the retries will save the time on the subsequent failure test. + props.put("message.send.max.retries", "0") val topic = "new-topic" // create topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)), servers = servers) - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) + val producer = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props) + try { // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only // on broker 0 @@ -253,14 +266,16 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ def testAsyncSendCanCorrectlyFailWithTimeout() { val timeoutMs = 500 val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props.put("request.required.acks", "1") + props.put("message.send.max.retries", "0") props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) + val producer = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props) val topic = "new-topic" // create topics in ZK @@ -296,20 +311,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val t2 = SystemTime.milliseconds - // make sure we don't wait fewer than numRetries*timeoutMs milliseconds - // we do this because the DefaultEventHandler retries a number of times - assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries) + // make sure we don't wait fewer than timeoutMs + assertTrue((t2-t1) >= timeoutMs) } @Test def testSendNullMessage() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) + val producer = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName) + try { // create topic diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 384c74e..49c7790 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -342,13 +342,12 @@ object TestUtils extends Logging { keyEncoder: String = classOf[DefaultEncoder].getName, partitioner: String = classOf[DefaultPartitioner].getName, producerProps: Properties = null): Producer[K, V] = { - val props: Properties = - if (producerProps == null) { - getProducerConfig(brokerList) - } else { - producerProps.put("metadata.broker.list", brokerList) - producerProps - } + val props: Properties = getProducerConfig(brokerList) + + //override any explicitly specified properties + if (producerProps != null) + props.putAll(producerProps) + props.put("serializer.class", encoder) props.put("key.serializer.class", keyEncoder) props.put("partitioner.class", partitioner) @@ -361,9 +360,9 @@ object TestUtils extends Logging { def getProducerConfig(brokerList: String): Properties = { val props = new Properties() props.put("metadata.broker.list", brokerList) - props.put("message.send.max.retries", "3") + props.put("message.send.max.retries", "5") props.put("retry.backoff.ms", "1000") - props.put("request.timeout.ms", "500") + props.put("request.timeout.ms", "2000") props.put("request.required.acks", "-1") props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000")