diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala new file mode 100644 index 0000000..3756596 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test + +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils, Logging} +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.AdminUtils + +import kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordSend, Callback} + +import kafka.common.StringSerialization + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert.assertEquals + +import java.util.Properties +import java.lang.IllegalArgumentException + + + +class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("num.partitions", "4") + props2.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + + private val numRecods = 10 + + override def setUp() { + super.setUp() + // set up 2 brokers with 4 partitions each + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + } + + override def tearDown() { + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + super.tearDown() + } + + class PrintOffsetCallback extends Callback { + override def onCompletion(send: RecordSend) { + try { + System.out.println("The offset of the message we just sent is: " + send.offset); + } catch { + case e: Throwable => fail("Should succeed sending the message", e) + } + } + } + + @Test + def testSend() { + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerialization].getName) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerialization].getName) + + val producer = new KafkaProducer(props) + + try { + // create topic + val topic = "new-topic" + AdminUtils.createTopic(zkClient, topic, 1, 2) + // wait until the update metadata request for new topic reaches all servers + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) + + val callback = new PrintOffsetCallback + + // send a normal record + val record0 = new ProducerRecord("new-topic", "key", "partKey", "value") + val response0 = producer.send(record0, callback) + assertEquals("Should have offset 0", 0L, response0.offset) + + // send a record with null value should be ok + val record1 = new ProducerRecord("new-topic", "key", "partKey", null) + val response1 = producer.send(record1, callback) + assertEquals("Should have offset 1", 1L, response1.offset) + + // send a record with null key should be ok + val record2 = new ProducerRecord("new-topic", null, "partKey", "value") + val response2 = producer.send(record2, callback) + assertEquals("Should have offset 2", 2L, response2.offset) + + // send a record with null part key should be ok + val record3 = new ProducerRecord("new-topic", "key", null, "value") + val response3 = producer.send(record3, callback) + assertEquals("Should have offset 3", 3L, response3.offset) + + // send a record with null topic should fail + try { + val record4 = new ProducerRecord(null, "key", "partKey", "value") + val response4 = producer.send(record4, callback) + response4.await + } catch { + case iae: IllegalArgumentException => // this is ok + case e: Throwable => fail("Only expecting IllegalArgumentException", e) + } + + // non-blocking send a list of records + for (i <- 1 to numRecods) + producer.send(record0, callback) + + // check that all messages have been acked via offset + val response5 = producer.send(record0, callback) + assertEquals("Should have offset 3", numRecods + 4L, response5.offset) + + } finally { + producer.close() + } + } +} \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index ddcfc41..029cfa4 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -138,15 +138,15 @@ object KafkaBuild extends Build { } } - lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ + lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(clients, core, examples, contrib, perf).settings((commonSettings ++ runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*) - lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) + lazy val clients = Project(id = "kafka-clients", base = file("clients")).settings(commonSettings :_*) + lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn (clients) lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*) lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) - lazy val clients = Project(id = "kafka-clients", base = file("clients")) }