diff --git a/build.gradle b/build.gradle index 858d297..60dd431 100644 --- a/build.gradle +++ b/build.gradle @@ -147,6 +147,7 @@ project(':core') { testCompile 'junit:junit:4.1' testCompile 'org.easymock:easymock:3.0' testCompile 'org.objenesis:objenesis:1.2' + testCompile project(':clients') if (scalaVersion.startsWith('2.8')) { testCompile 'org.scalatest:scalatest:1.2' } else if (scalaVersion.startsWith('2.10')) { diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index d7d03ea..df572f2 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala new file mode 100644 index 0000000..ab8c93f --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test + +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.AdminUtils +import kafka.consumer.SimpleConsumer +import kafka.api.FetchRequestBuilder +import kafka.message.Message + +import org.apache.kafka.clients.producer._ + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert._ + +import java.util.Properties +import java.lang.{Integer, IllegalArgumentException} +import org.apache.log4j.Logger + + +class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("num.partitions", "4") + props2.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + + private val topic1 = "topic-1" + private val numRecords = 100 + + override def setUp() { + super.setUp() + // set up 2 brokers with 4 partitions each + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + } + + override def tearDown() { + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + super.tearDown() + } + + class PrintOffsetCallback extends Callback { + def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception != null) + fail("Send callback returns the following exception", exception) + try { + System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset); + } catch { + case e: Throwable => fail("Should succeed sending the message", e) + } + } + } + + /** + * testSendOffset checks the basic send API behavior + * + * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. + * 2. Last message of the non-blocking send should return the correct offset metadata + */ + @Test + def testSendOffset() { + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = new KafkaProducer(props) + + val callback = new PrintOffsetCallback + + try { + // create topic + AdminUtils.createTopic(zkClient, topic1, 1, 2) + // wait until the update metadata request for new topic reaches all servers + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 500) + + // send a normal record + val record0 = new ProducerRecord(topic1, new Integer(0), "key".getBytes, "value".getBytes) + val response0 = producer.send(record0, callback) + assertEquals("Should have offset 0", 0L, response0.get.offset) + + // send a record with null value should be ok + val record1 = new ProducerRecord(topic1, new Integer(0), "key".getBytes, null) + val response1 = producer.send(record1, callback) + assertEquals("Should have offset 1", 1L, response1.get.offset) + + // send a record with null key should be ok + val record2 = new ProducerRecord(topic1, new Integer(0), null, "value".getBytes) + val response2 = producer.send(record2, callback) + assertEquals("Should have offset 2", 2L, response2.get.offset) + + // send a record with null part id should be ok + val record3 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val response3 = producer.send(record3, callback) + assertEquals("Should have offset 3", 3L, response3.get.offset) + + // send a record with null topic should fail + try { + val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) + val response4 = producer.send(record4, callback) + response4.wait + } catch { + case iae: IllegalArgumentException => // this is ok + case e: Throwable => fail("Only expecting IllegalArgumentException", e) + } + + // non-blocking send a list of records + for (i <- 1 to numRecords) + producer.send(record0) + + // check that all messages have been acked via offset + val response5 = producer.send(record0, callback) + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset) + + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } + + /** + * testClose checks the closing behavior + * + * 1. After close() returns, all messages should be sent with correct returned offset metadata + */ + @Test + def testClose() { + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = new KafkaProducer(props) + + try { + // create topic + AdminUtils.createTopic(zkClient, topic1, 1, 2) + // wait until the update metadata request for new topic reaches all servers + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 500) + + // non-blocking send a list of records + val record0 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + for (i <- 1 to 10 * numRecords) + producer.send(record0) + val response0 = producer.send(record0) + + // close the producer + producer.close() + producer = null + + // check that all messages have been acked via offset, + // this also checks that messages with same key go to the same partition + assertTrue("The last message should be acked before producer is shutdown", response0.isDone) + assertEquals("Should have offset " + (10 * numRecords), (10 * numRecords).toLong, response0.get.offset) + + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } + + /** + * testClose checks the partitioning behavior + * + * 1. The default partitioner should have the correct round-robin behavior in assigning partitions + * 2. The specified partition-id should be respected + */ + @Test + def testSendToPartition() { + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1") + var producer = new KafkaProducer(props) + + try { + // create topic + AdminUtils.createTopic(zkClient, topic1, 2, 2) + // wait until the update metadata request for new topic reaches all servers + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 500) + + // find the leader of one of the partition + val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic1, 0) + assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined) + val leader = leaderOpt.get + + // case 1: use default partitioner, send 2*numRecords+2 messages with no partition-id/keys, + // they should be assigned to two partitions evenly as (1,3,5,7..) and (2,4,6,8..) + for (i <- 1 to 2 * numRecords) { + val record = new ProducerRecord(topic1, null, null, ("value" + i).getBytes) + producer.send(record) + } + + // make sure both partitions have acked back + val record0 = new ProducerRecord(topic1, null, null, ("value" + (2 * numRecords + 1)).getBytes) + val response0 = producer.send(record0); + assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) + val record1 = new ProducerRecord(topic1, null, null, ("value" + (2 * numRecords + 2)).getBytes) + val response1 = producer.send(record1); + assertEquals("Should have offset " + numRecords, numRecords.toLong, response1.get.offset) + + // get messages from partition 0, and check they has numRecords+1 messages with + val fetchResponse1 = if(leader == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, 0, 0, Int.MaxValue).build()) + } + val messageSet1 = fetchResponse1.messageSet(topic1, 0).iterator.toBuffer + assertEquals("Should have fetched " + (numRecords + 1) + " messages", numRecords + 1, messageSet1.size) + + // if the first message gets 1, make sure the rest are (3,5,7..); + // if the first message gets 2, make sure the rest are (4,6,8..) + val startWithOne = messageSet0(0).message.equals(new Message(bytes = "value1".getBytes)) + for (i <- 1 to numRecords) { + if(startWithOne) { + assertEquals(new Message(bytes = ("value" + (i * 2 + 1)).getBytes), messageSet1(i).message) + } else { + assertEquals(new Message(bytes = ("value" + (i * 2 + 2)).getBytes), messageSet1(i).message) + } + } + + // case 2: check the specified partition id is respected by sending numRecords with partition-id 0 + // and make sure all of them end up in partition 0 + for (i <- 1 to numRecords - 1) { + val record = new ProducerRecord(topic1, new Integer(0), null, ("value" + i).getBytes) + producer.send(record) + } + val record2 = new ProducerRecord(topic1, new Integer(0), null, ("value" + numRecords).getBytes) + val response2 = producer.send(record2); + assertEquals("Should have offset " + 2 * numRecords, (2 * numRecords).toLong, response2.get.offset) + + // start fetching from offset numRecords+1 + val fetchResponse2 = if(leader == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, 0, numRecords+1, Int.MaxValue).build()) + }else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, 0, numRecords+1, Int.MaxValue).build()) + } + val messageSet2 = fetchResponse2.messageSet(topic1, 0).iterator.toBuffer + + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet2.size) + + for (i <- 0 to numRecords - 1) { + assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet2(i).message) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } + + @Test + def testCreateTopic() { + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = new KafkaProducer(props) + + try { + // Send a message to auto-create the topic + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val response = producer.send(record) + assertEquals("Should have offset 0", 0L, response.get.offset) + + // double check that the topic is created with leader elected + assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 0).isDefined) + + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } +} \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index ddcfc41..d3231c9 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -140,13 +140,13 @@ object KafkaBuild extends Build { lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*) - lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) + lazy val clients = Project(id = "kafka-clients", base = file("clients")).settings(commonSettings :_*) + lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn (clients) lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*) lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) - lazy val clients = Project(id = "kafka-clients", base = file("clients")) }