diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index c78da64..f06e28c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import java.util.List; @@ -61,11 +62,20 @@ public class PartitionerTest { } @Test + public void testRoundRobinIsStable() { + int startPart = partitioner.partition(new ProducerRecord("test", value), cluster); + for (int i = 1; i <= 100; i++) { + int partition = partitioner.partition(new ProducerRecord("test", value), cluster); + assertEquals("Should yield a different partition each call with round-robin partitioner", + partition, (startPart + i) % 2); + } + } + + @Test public void testRoundRobinWithDownNode() { for (int i = 0; i < partitions.size(); i++) { int part = partitioner.partition(new ProducerRecord("test", value), cluster); assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2); - } } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index f8ba361..34baa8c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -195,8 +195,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testSendToPartition checks the partitioning behavior * - * 1. The default partitioner should have the correct round-robin behavior in assigning partitions - * 2. The specified partition-id should be respected + * 1. The specified partition-id should be respected */ @Test def testSendToPartition() { @@ -210,68 +209,38 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) // make sure leaders exist - val leader0 = leaders.get(0) val leader1 = leaders.get(1) - assertTrue("Leader for topic new-topic partition 0 should exist", leader0.isDefined) - assertTrue("Leader for topic new-topic partition 1 should exist", leader1.isDefined) - - // case 1: use default partitioner, send 2*numRecords+2 messages with no partition-id/keys, - // they should be assigned to two partitions evenly as (1,3,5,7..) and (2,4,6,8..) - for (i <- 1 to 2 * numRecords) { - val record = new ProducerRecord(topic, null, null, ("value" + i).getBytes) - producer.send(record) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) + + val partition = 1 + val responses = + for (i <- 0 until numRecords) + yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) + val futures = responses.toList + futures.map(_.wait) + for (future <- futures) + assertTrue("Request should have completed", future.isDone) + + // make sure all of them end up in the same partition with increasing offset values + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset, future.get.offset) + assertEquals(topic, future.get.topic) + assertEquals(1, future.get.partition) } - // make sure both partitions have acked back - val record0 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 1)).getBytes) - val response0 = producer.send(record0); - assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) - val record1 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 2)).getBytes) - val response1 = producer.send(record1); - assertEquals("Should have offset " + numRecords, numRecords.toLong, response1.get.offset) - - // get messages from partition 0, and check they has numRecords+1 messages - val fetchResponse0 = if(leader0.get == server1.config.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } - val messageSet0 = fetchResponse0.messageSet(topic, 0).iterator.toBuffer - assertEquals("Should have fetched " + (numRecords + 1) + " messages", numRecords + 1, messageSet0.size) - - // if the first message gets 1, make sure the rest are (3,5,7..); - // if the first message gets 2, make sure the rest are (4,6,8..) - val startWithOne = messageSet0(0).message.equals(new Message(bytes = "value1".getBytes)) - for (i <- 1 to numRecords) { - if(startWithOne) { - assertEquals(new Message(bytes = ("value" + (i * 2 + 1)).getBytes), messageSet0(i).message) - } else { - assertEquals(new Message(bytes = ("value" + (i * 2 + 2)).getBytes), messageSet0(i).message) - } - } - - // case 2: check the specified partition id is respected by sending numRecords with partition-id 1 - // and make sure all of them end up in partition 1 - for (i <- 1 to numRecords - 1) { - val record = new ProducerRecord(topic, new Integer(1), null, ("value" + i).getBytes) - producer.send(record) - } - val record2 = new ProducerRecord(topic, new Integer(1), null, ("value" + numRecords).getBytes) - val response2 = producer.send(record2); - assertEquals("Should have offset " + 2 * numRecords, (2 * numRecords).toLong, response2.get.offset) - - // start fetching from offset numRecords+1 + // make sure the fetched messages also respect the partitioning and ordering val fetchResponse1 = if(leader1.get == server1.config.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build()) + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) }else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build()) + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) } val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer - assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) + // TODO: also check topic and partition after they are added in the return messageSet for (i <- 0 to numRecords - 1) { assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) + assertEquals(i, messageSet1(i).offset) } } finally { if (producer != null) {