diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 7125ec9..95303e0 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -17,22 +17,25 @@ package kafka.integration -import junit.framework.Assert._ import kafka.utils.{ZKGroupTopicDirs, Logging} import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer} import kafka.server._ -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils import kafka.serializer._ import kafka.producer.{Producer, KeyedMessage} +import org.junit.Test +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) + val topic = "test_topic" val group = "default_group" val testConsumer = "consumer" - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 @@ -51,69 +54,39 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L super.tearDown } - // fake test so that this test can pass - def testResetToEarliestWhenOffsetTooHigh() = - assertTrue(true) - - /* Temporarily disable those tests due to failures. -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55) - - -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64) - + @Test def testResetToEarliestWhenOffsetTooHigh() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset)) - + + @Test def testResetToEarliestWhenOffsetTooLow() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset)) + @Test def testResetToLatestWhenOffsetTooHigh() = assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset)) + @Test def testResetToLatestWhenOffsetTooLow() = assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset)) - */ /* Produce the given number of messages, create a consumer with the given offset policy, * then reset the offset to the given value and consume until we get no new messages. * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, 1, 1, servers) - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder()) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( + TestUtils.getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) - // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) - var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) consumerProps.put("auto.offset.reset", resetTo) consumerProps.put("consumer.timeout.ms", "2000") consumerProps.put("fetch.wait.max.ms", "0") diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 4075068..25845ab 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -82,9 +82,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def sendMessages(messagesPerNode: Int): Int = { var count = 0 for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), - new StringEncoder()) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( + TestUtils.getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 731ee59..108c2e7 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -19,8 +19,10 @@ package kafka.integration import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnit3Suite -import kafka.producer.{ProducerConfig, Producer} -import kafka.utils.TestUtils +import kafka.producer.Producer +import kafka.utils.{StaticPartitioner, TestUtils} +import kafka.serializer.StringEncoder + trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { val port: Int val host = "localhost" @@ -29,8 +31,10 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes override def setUp() { super.setUp - val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner") - producer = new Producer(new ProducerConfig(props)) + producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName) consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index d1d969e..f44568c 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -251,10 +251,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } private def produceMessage(topic: String, message: String) = { - val props = new Properties() - props.put("request.required.acks", String.valueOf(-1)) - val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder(), props) + val producer: Producer[String, Array[Byte]] = createProducer( + getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) producer.close() } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 16e7164..20e8efe 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -17,22 +17,24 @@ package kafka.javaapi.consumer -import junit.framework.Assert._ -import kafka.integration.KafkaServerTestHarness import kafka.server._ -import org.scalatest.junit.JUnit3Suite -import scala.collection.JavaConversions -import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ +import kafka.integration.KafkaServerTestHarness import kafka.producer.KeyedMessage import kafka.javaapi.producer.Producer import kafka.utils.IntEncoder -import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness +import scala.collection.JavaConversions + +import org.scalatest.junit.JUnit3Suite +import org.apache.log4j.{Level, Logger} +import junit.framework.Assert._ + + class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { val zookeeperConnect = zkConnect @@ -52,14 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testBasic() { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) - var actualMessages: List[Message] = Nil + + // create the topic + TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker val sentMessages1 = sendMessages(nMessages, "batch1") - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -79,7 +80,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compressed: CompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = - TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder()) + TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7a0ef6f..f95528e 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -25,6 +25,7 @@ import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import kafka.serializer.StringEncoder class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -51,28 +52,36 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) - producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) - producerProps.put("request.required.acks", "-1") - - override def tearDown() { - for(server <- servers) { - server.shutdown() - Utils.rm(server.config.logDirs(0)) - } - super.tearDown() - } + producerProps.put("key.serializer.class", classOf[IntEncoder].getName) + producerProps.put("serializer.class", classOf[StringEncoder].getName) + + override def setUp() { + super.setUp() - def testHWCheckpointNoFailuresSingleLogSegment { // start both servers server1 = TestUtils.createServer(configProps1) server2 = TestUtils.createServer(configProps2) servers ++= List(server1, server2) - producer = new Producer[Int, String](new ProducerConfig(producerProps)) - // create topic with 1 partition, 2 replicas, one on each broker createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + // create the producer + producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) + } + + override def tearDown() { + producer.close() + for(server <- servers) { + server.shutdown() + Utils.rm(server.config.logDirs(0)) + } + super.tearDown() + } + + def testHWCheckpointNoFailuresSingleLogSegment { val numMessages = 2L sendMessages(numMessages.toInt) @@ -82,7 +91,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000)) servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) - producer.close() val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) @@ -90,15 +98,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { } def testHWCheckpointWithFailuresSingleLogSegment { - // start both servers - server1 = TestUtils.createServer(configProps1) - server2 = TestUtils.createServer(configProps2) - servers ++= List(server1, server2) - - producer = new Producer[Int, String](new ProducerConfig(producerProps)) - - // create topic with 1 partition, 2 replicas, one on each broker - var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0) + var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) @@ -139,33 +139,18 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) - producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) } def testHWCheckpointNoFailuresMultipleLogSegments { - // start both servers - server1 = TestUtils.createServer(configs.head) - server2 = TestUtils.createServer(configs.last) - servers ++= List(server1, server2) - - hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename)) - hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename)) - - producer = new Producer[Int, String](new ProducerConfig(producerProps)) - - // create topic with 1 partition, 2 replicas, one on each broker - createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) - sendMessages(20) - var hw = 20L + val hw = 20L // give some time for follower 1 to record leader HW of 600 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) - producer.close() val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(hw, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) @@ -173,19 +158,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { } def testHWCheckpointWithFailuresMultipleLogSegments { - // start both servers - server1 = TestUtils.createServer(configs.head) - server2 = TestUtils.createServer(configs.last) - servers ++= List(server1, server2) - - hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename)) - hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename)) - - producer = new Producer[Int, String](new ProducerConfig(producerProps)) - - // create topic with 1 partition, 2 replicas, one on each broker - var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)), - servers = servers)(0) + var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) sendMessages(2) var hw = 2L @@ -220,7 +193,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000)) // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) - producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 481a400..eb0ade3 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -54,9 +54,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), - new StringEncoder(), - new StringEncoder()) + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName) val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) producer.send(messages:_*) producer.close() diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index addd11a..f4e1e81 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -24,10 +24,10 @@ import kafka.message.ByteBufferMessageSet import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.producer._ -import kafka.utils.IntEncoder +import kafka.utils.{StaticPartitioner, IntEncoder, TestUtils, Utils} import kafka.utils.TestUtils._ import kafka.api.FetchRequestBuilder -import kafka.utils.{TestUtils, Utils} +import kafka.serializer.StringEncoder class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -43,9 +43,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config))) - producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString) - var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) + var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) // create topic createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) @@ -69,7 +69,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for the broker to receive the update metadata request after startup TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) - producer = new Producer[Int, String](new ProducerConfig(producerConfig)) + producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") var fetchedMessage: ByteBufferMessageSet = null diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 130b6be..d8054b4 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -337,27 +337,26 @@ object TestUtils extends Logging { * Create a producer for the given host and port */ def createProducer[K, V](brokerList: String, - encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder(), - props: Properties = new Properties()): Producer[K, V] = { - props.put("metadata.broker.list", brokerList) - props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - props.put("serializer.class", encoder.getClass.getCanonicalName) - props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName) + encoder: String = classOf[DefaultEncoder].getName, + keyEncoder: String = classOf[DefaultEncoder].getName, + partitioner: String = classOf[DefaultPartitioner].getName): Producer[K, V] = { + val props = getProducerConfig(brokerList) + props.put("serializer.class", encoder) + props.put("key.serializer.class", keyEncoder) + props.put("partitioner.class", partitioner) new Producer[K, V](new ProducerConfig(props)) } - def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { + def getProducerConfig(brokerList: String): Properties = { val props = new Properties() props.put("metadata.broker.list", brokerList) - props.put("partitioner.class", partitioner) props.put("message.send.max.retries", "3") props.put("retry.backoff.ms", "1000") props.put("request.timeout.ms", "500") props.put("request.required.acks", "-1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("send.buffer.bytes", "65536") + props.put("connect.timeout.ms", "100000") + props.put("reconnect.interval", "10000") props } @@ -368,7 +367,7 @@ object TestUtils extends Logging { props.put("port", port.toString) props.put("request.timeout.ms", "500") props.put("request.required.acks", "1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("serializer.class", classOf[StringEncoder].getName) props }