Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -17,23 +17,21 @@ package kafka.log4j -import org.apache.log4j.spi.LoggingEvent -import org.apache.log4j.{PropertyConfigurator, Logger} import java.util.Properties import java.io.File +import junit.framework.Assert._ +import kafka.api.FetchRequestBuilder import kafka.consumer.SimpleConsumer -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.TestZKUtils -import kafka.zk.EmbeddedZookeeper -import junit.framework.Assert._ -import kafka.api.FetchRequest -import kafka.serializer.Encoder import kafka.message.Message import kafka.producer.async.MissingConfigException +import kafka.serializer.Encoder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging} +import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness} +import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils, Logging} class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -172,10 +170,10 @@ Thread.sleep(2500) var offset = 0L - val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024)) - + val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build()) + val fetchedMessage = response.messageSet("test-topic", 0) var count = 0 - for(message <- messages) { + for(message <- fetchedMessage) { count = count + 1 offset += message.offset } @@ -192,14 +190,16 @@ Thread.sleep(500) - val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024)) + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) + val fetchMessage = response.messageSet("test-topic", 0) var count = 0 - for(message <- messages) { + for(message <- fetchMessage) { count = count + 1 } - val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024)) + val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) + val messagesFromOtherBroker = response2.messageSet("test-topic", 0) for(message <- messagesFromOtherBroker) { count = count + 1 Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -32,6 +32,7 @@ import collection.mutable.ListBuffer import kafka.consumer.{KafkaMessageStream, ConsumerConfig} import scala.collection.Map +import kafka.api.{ProducerRequest, WiredTopic, WiredPartition} /** * Utility functions to help with testing @@ -186,23 +187,21 @@ length += 1 assertEquals(expected.next, actual.next) } - - if (expected.hasNext) - { + + // check if the expected iterator is longer + if (expected.hasNext) { var length1 = length; - while (expected.hasNext) - { + while (expected.hasNext) { expected.next length1 += 1 } assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true); } - - if (actual.hasNext) - { + + // check if the actual iterator was longer + if (actual.hasNext) { var length2 = length; - while (actual.hasNext) - { + while (actual.hasNext) { actual.next length2 += 1 } @@ -332,8 +331,42 @@ buffer } + /** + * Create a wired format request based on simple basic information + */ + def produceWiredRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceWiredRequest(-1,topic,partition,message) + } + + def produceWiredRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](1) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(partition,message) + data(0) = new WiredTopic(topic,partition_data) + val producerRequest = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + producerRequest + } + + def produceJavaAPIWiredRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + produceJavaAPIWiredRequest(-1,topic,partition,message) + } + + def produceJavaAPIWiredRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](1) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(partition,message) + data(0) = new WiredTopic(topic,partition_data) + val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + producerRequest + } } object TestZKUtils { val zookeeperConnect = "127.0.0.1:2182" -} +} \ No newline at end of file Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (working copy) @@ -22,7 +22,6 @@ import kafka.server.{KafkaConfig, KafkaServer} import junit.framework.Assert._ import java.util.{Random, Properties} -import kafka.api.{FetchRequest, OffsetRequest} import collection.mutable.WrappedArray import kafka.consumer.SimpleConsumer import org.junit.{After, Before, Test} @@ -30,6 +29,7 @@ import org.apache.log4j._ import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite +import kafka.api.{FetchRequestBuilder, OffsetRequest} object LogOffsetTest { val random = new Random() @@ -66,9 +66,8 @@ @Test def testEmptyLogs() { - val messageSet: ByteBufferMessageSet = simpleConsumer.fetch( - new FetchRequest("test", 0, 0, 300 * 1024)) - assertFalse(messageSet.iterator.hasNext) + val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build()) + assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext) val name = "test" val logFile = new File(logDir, name + "-0") @@ -119,9 +118,9 @@ assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) // try to fetch using latest offset - val messageSet: ByteBufferMessageSet = simpleConsumer.fetch( - new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024)) - assertFalse(messageSet.iterator.hasNext) + val fetchResponse = simpleConsumer.fetch( + new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build()) + assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext) } @Test Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -18,18 +18,19 @@ package kafka.integration import scala.collection._ +import java.io.File +import java.util.Properties import junit.framework.Assert._ -import kafka.api.{ProducerRequest, FetchRequest} -import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException} +import kafka.common.{ErrorMapping, OffsetOutOfRangeException, InvalidPartitionException} +import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.producer.{ProducerData, Producer, ProducerConfig} +import kafka.serializer.StringDecoder import kafka.server.{KafkaRequestHandler, KafkaConfig} +import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite -import java.util.Properties -import kafka.producer.{ProducerData, Producer, ProducerConfig} -import kafka.serializer.StringDecoder -import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} -import java.io.File -import kafka.utils.TestUtils +import java.nio.ByteBuffer +import kafka.api.{FetchRequest, FetchRequestBuilder, ProducerRequest, WiredTopic, WiredPartition} /** * End to end tests of the primitive apis against a local server @@ -39,11 +40,28 @@ val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) val config = new KafkaConfig(props) { - override val flushInterval = 1 - } + override val flushInterval = 1 + } val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) + def testFetchRequestCanProperlySerialize() { + val request = new FetchRequestBuilder() + .correlationId(100) + .clientId("test-client") + .maxWait(10001) + .minBytes(4444) + .addFetch("topic1", 0, 0, 10000) + .addFetch("topic2", 1, 1024, 9999) + .addFetch("topic1", 1, 256, 444) + .build() + val serializedBuffer = ByteBuffer.allocate(request.sizeInBytes) + request.writeTo(serializedBuffer) + serializedBuffer.rewind() + val deserializedRequest = FetchRequest.readFrom(serializedBuffer) + assertEquals(request, deserializedRequest) + } + def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" val props = new Properties() @@ -55,10 +73,18 @@ stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) Thread.sleep(200) - var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - assertTrue(fetched.iterator.hasNext) + val request = new FetchRequestBuilder() + .correlationId(100) + .clientId("test-client") + .addFetch(topic, 0, 0, 10000) + .build() + val fetched = consumer.fetch(request) + assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId) - val fetchedMessageAndOffset = fetched.iterator.next + val messageSet = fetched.messageSet(topic, 0) + assertTrue(messageSet.iterator.hasNext) + + val fetchedMessageAndOffset = messageSet.head val stringDecoder = new StringDecoder val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) assertEquals("test-message", fetchedStringMessage) @@ -76,10 +102,11 @@ stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) Thread.sleep(200) - var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - assertTrue(fetched.iterator.hasNext) + var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val messageSet = fetched.messageSet(topic, 0) + assertTrue(messageSet.iterator.hasNext) - val fetchedMessageAndOffset = fetched.iterator.next + val fetchedMessageAndOffset = messageSet.head val stringDecoder = new StringDecoder val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) assertEquals("test-message", fetchedStringMessage) @@ -87,24 +114,27 @@ def testProduceAndMultiFetch() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(700) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } // temporarily set request handler logger to a higher level @@ -112,34 +142,34 @@ { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, partition).iterator + fail("Expected exception when fetching message with invalid offset") + } catch { case e: OffsetOutOfRangeException => "this is good" } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, -1, 0, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, -1).iterator + fail("Expected exception when fetching message with invalid partition") + } catch { case e: InvalidPartitionException => "this is good" } } @@ -150,24 +180,27 @@ def testProduceAndMultiFetchWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(DefaultCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } // temporarily set request handler logger to a higher level @@ -175,34 +208,34 @@ { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, partition).iterator + fail("Expected exception when fetching message with invalid offset") + } catch { case e: OffsetOutOfRangeException => "this is good" } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) + response.messageSet(topic, -1).iterator + fail("Expected exception when fetching message with invalid partition") + } catch { case e: InvalidPartitionException => "this is good" } } @@ -213,58 +246,80 @@ def testMultiProduce() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](topics.size) + var index = 0 + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testMultiProduceWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { - val set = new ByteBufferMessageSet(DefaultCompressionCodec, + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](topics.size) + var index = 0 + for( (topic, partition) <- topics) { + val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, 0) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testConsumerNotExistTopic() { val newTopic = "new-topic" - val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator - assertTrue(messageSetIter.hasNext == false) + val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) + assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) val logFile = new File(config.logDir, newTopic + "-0") assertTrue(!logFile.exists) } Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (working copy) @@ -20,16 +20,14 @@ import kafka.server.KafkaConfig import java.io.File import java.nio.ByteBuffer -import kafka.utils.Utils -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.common.InvalidMessageSizeException -import kafka.utils.TestUtils import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig} +import kafka.integration.{KafkaServerTestHarness, ProducerConsumerTestHarness} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.utils.{Utils, TestUtils} import org.scalatest.junit.JUnit3Suite -import kafka.integration.ProducerConsumerTestHarness -import kafka.integration.KafkaServerTestHarness import org.apache.log4j.{Logger, Level} -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness { val port = TestUtils.choosePort @@ -65,23 +63,21 @@ Thread.sleep(500) // test SimpleConsumer - val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000)) + val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build()) try { - for (msg <- messageSet) + for (msg <- response.messageSet(topic, partition)) fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => "This is good" } - val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000)) + val response2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build()) try { - for (msg <- messageSet2) + for (msg <- response2.messageSet(topic, partition)) fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => println("This is good") } @@ -95,8 +91,7 @@ for (message <- messageStreams(0)) fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.") fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => "This is good" case e: Exception => "This is not bad too !" } Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy) @@ -17,20 +17,20 @@ package kafka.integration -import scala.collection._ +import kafka.api.{FetchRequestBuilder, ProducerRequest, WiredTopic, WiredPartition} import kafka.common.OffsetOutOfRangeException -import kafka.api.{ProducerRequest, FetchRequest} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig} +import kafka.utils.{TestUtils, Utils, Logging} +import kafka.zk.ZooKeeperTestHarness import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils} +import scala.collection._ /** * End to end tests of the primitive apis against a local server */ -class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { +class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness with Logging { val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) @@ -65,54 +65,60 @@ new Message("hello".getBytes()), new Message("there".getBytes())) producer.send(topic, sent) sent.getBuffer.rewind - var fetched: ByteBufferMessageSet = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent.iterator, fetched.iterator) + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0) + } + TestUtils.checkEquals(sent.iterator, fetchedMessage.iterator) + // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + fetchedWithError.messageSet(topic, 0).iterator fail("Expected an OffsetOutOfRangeException exception to be thrown") - } - catch { + } catch { case e: OffsetOutOfRangeException => } } def testProduceAndMultiFetch() { - // send some messages - val topics = List("test1", "test2", "test3"); + // send some messages, with non-ordered topics + val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, offset) <- topicOffsets) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + messages += topic -> set + builder.addFetch(topic, offset, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, offset) <- topicOffsets) { + val fetched = response.messageSet(topic, offset) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, offset) <- topicOffsets ) + builder.addFetch(topic, offset, -1, 10000) - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) { + val request = builder.build() + val responses = consumer.fetch(request) + for( (topic, offset) <- topicOffsets ) { try { - resp.iterator + responses.messageSet(topic, offset).iterator fail("Expected an OffsetOutOfRangeException exception to be thrown") } catch { case e: OffsetOutOfRangeException => @@ -125,55 +131,76 @@ // send some messages val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](3) + var index = 0 for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, 0, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for(topic <- topics) { + val fetched = response.messageSet(topic, 0) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testMultiProduceResend() { // send some messages val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](3) + var index = 0 for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, 0, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) // resend the same multisend - producer.multiSend(produceList.toArray) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(750) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) + val request = builder.build() + val response = consumer.fetch(request) + for(topic <- topics) { + val topicMessages = response.messageSet(topic, 0) TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator, messages(topic).map(m => m.message).iterator), - resp.map(m => m.message).iterator) -// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator) + topicMessages.iterator.map(_.message)) + } } } Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (working copy) @@ -17,15 +17,17 @@ package kafka.integration -import kafka.server.KafkaConfig -import org.scalatest.junit.JUnit3Suite -import org.apache.log4j.Logger +import junit.framework.Assert._ import java.util.Properties + +import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.consumer.SimpleConsumer -import kafka.api.{OffsetRequest, FetchRequest} -import junit.framework.Assert._ +import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.apache.log4j.Logger +import org.scalatest.junit.JUnit3Suite + class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness { val topic = "MagicByte0" @@ -62,9 +64,10 @@ var messageCount: Int = 0 while(fetchOffset < lastOffset(0)) { - val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000)) - fetched.foreach(m => fetchOffset = m.offset) - messageCount += fetched.size + val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build()) + val fetchedMessages = fetched.messageSet(topic, 0) + fetchedMessages.foreach(m => fetchOffset = m.offset) + messageCount += fetchedMessages.size } assertEquals(100, messageCount) } Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -63,7 +63,7 @@ Assert.assertTrue((secondEnd-secondStart) < 500) try { - producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))) + producer.send(TestUtils.produceJavaAPIWiredRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) }catch { case e: Exception => failed=true } Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -311,11 +311,11 @@ val topic = "topic1" val msgs = TestUtils.getMsgStrings(10) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, - messagesToSet(msgs.take(5)))))) + mockSyncProducer.send(TestUtils.produceJavaAPIWiredRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.take(5)))) EasyMock.expectLastCall - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, - messagesToSet(msgs.takeRight(5)))))) + mockSyncProducer.send(TestUtils.produceJavaAPIWiredRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.takeRight(5)))) EasyMock.expectLastCall mockSyncProducer.close EasyMock.expectLastCall @@ -402,8 +402,5 @@ override def send(topic: String, messages: ByteBufferMessageSet): Unit = { Thread.sleep(1000) } - override def multiSend(produces: Array[ProducerRequest]) { - Thread.sleep(1000) - } } } Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -17,18 +17,18 @@ package kafka.producer -import org.apache.log4j.{Logger, Level} -import kafka.zk.EmbeddedZookeeper -import org.junit.{After, Before, Test} import junit.framework.Assert._ -import org.scalatest.junit.JUnitSuite -import kafka.utils.{TestUtils, TestZKUtils, Utils} -import kafka.api.FetchRequest +import java.util.Properties +import kafka.api.FetchRequestBuilder +import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.serializer.Encoder -import kafka.consumer.SimpleConsumer -import java.util.Properties import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig} +import kafka.utils.{TestUtils, TestZKUtils, Utils} +import kafka.zk.EmbeddedZookeeper +import org.apache.log4j.{Logger, Level} +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite class ProducerTest extends JUnitSuite { private val topic = "test-topic" @@ -106,12 +106,14 @@ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - assertTrue("Message set should have 1 message", messageSet1.hasNext) - assertEquals(new Message("test1".getBytes), messageSet1.next.message) - val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - assertTrue("Message set should have 1 message", messageSet2.hasNext) - assertEquals(new Message("test1".getBytes), messageSet2.next.message) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0) + assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.head.message) + val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet2 = response2.messageSet("new-topic", 0) + assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext) + assertEquals(new Message("test1".getBytes), messageSet2.head.message) } catch { case e: Exception => fail("Not expected", e) } @@ -142,11 +144,12 @@ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - assertTrue("Message set should have 1 message", messageSet1.hasNext) - assertEquals(new Message("test1".getBytes), messageSet1.next.message) - assertTrue("Message set should have another message", messageSet1.hasNext) - assertEquals(new Message("test1".getBytes), messageSet1.next.message) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1Iter = response1.messageSet("new-topic", 0).iterator + assertTrue("Message set should have 1 message", messageSet1Iter.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message) + assertTrue("Message set should have another message", messageSet1Iter.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message) } catch { case e: Exception => fail("Not expected") } @@ -174,9 +177,10 @@ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - assertTrue("Message set should have 1 message", messageSet1.hasNext) - assertEquals(new Message("test".getBytes), messageSet1.next.message) + val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0) + assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext) + assertEquals(new Message("test".getBytes), messageSet1.head.message) // shutdown server2 server2.shutdown @@ -197,9 +201,10 @@ Thread.sleep(100) // cross check if brokers got the messages - val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - assertTrue("Message set should have 1 message", messageSet2.hasNext) - assertEquals(new Message("test".getBytes), messageSet2.next.message) + val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet2 = response2.messageSet("new-topic", 0) + assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext) + assertEquals(new Message("test".getBytes), messageSet2.head.message) } catch { case e: Exception => fail("Not expected", e) Index: core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala =================================================================== --- core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (working copy) @@ -77,7 +77,7 @@ // wait a bit to make sure rebalancing logic is triggered - Thread.sleep(1000) + Thread.sleep(1500) // check Partition Owner Registry val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_3 = List( ("200-0", "group1_consumer1-0"), Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -17,7 +17,6 @@ package kafka.server import java.io.File -import kafka.api.FetchRequest import kafka.producer.{SyncProducer, SyncProducerConfig} import kafka.consumer.SimpleConsumer import java.util.Properties @@ -27,6 +26,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.{TestUtils, Utils} +import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest} class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -82,11 +82,13 @@ server.startup() // bring the server back again and read the messages - var fetched: ByteBufferMessageSet = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent1.iterator, fetched.iterator) - val newOffset = fetched.validBytes + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0) + } + TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator) + val newOffset = fetchedMessage.validBytes // send some more messages producer.send(topic, sent2) @@ -94,10 +96,12 @@ Thread.sleep(200) - fetched = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000)) - TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator) + fetchedMessage = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0) + } + TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m => m.message).iterator) server.shutdown() Utils.rm(server.config.logDir) Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (revision 1242817) +++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (working copy) @@ -18,15 +18,16 @@ package kafka.javaapi.integration import scala.collection._ -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException} +import kafka.javaapi.ProducerRequest +import kafka.javaapi.message.ByteBufferMessageSet +import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message} import kafka.server.{KafkaRequestHandler, KafkaConfig} +import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.javaapi.ProducerRequest -import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message} -import kafka.utils.TestUtils +import kafka.api.{WiredTopic, WiredPartition} /** * End to end tests of the primitive apis against a local server @@ -43,39 +44,42 @@ // send some messages val topic = "test" -// send an empty messageset first - val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(Seq.empty[Message]: _*)) + // send an empty messageset first + val sent2 = new ByteBufferMessageSet(NoCompressionCodec, getMessageList(Seq.empty[Message]: _*)) producer.send(topic, sent2) + Thread.sleep(200) sent2.getBuffer.rewind - var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent2.iterator, fetched2.iterator) + val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val fetchedMessage2 = fetched2.messageSet(topic, 0) + TestUtils.checkEquals(sent2.iterator, fetchedMessage2.iterator) // send some messages - val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) + val sent3 = new ByteBufferMessageSet(NoCompressionCodec, + getMessageList( + new Message("hello".getBytes()),new Message("there".getBytes()))) producer.send(topic, sent3) Thread.sleep(200) sent3.getBuffer.rewind - var fetched3: ByteBufferMessageSet = null - while(fetched3 == null || fetched3.validBytes == 0) - fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent3.iterator, fetched3.iterator) + var messageSet: ByteBufferMessageSet = null + while(messageSet == null || messageSet.validBytes == 0) { + val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + messageSet = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent3.iterator, messageSet.iterator) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator - fail("expect exception") - } - catch { + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + val messageWithError = fetchedWithError.messageSet(topic, 0) + messageWithError.iterator + fail("Fetch with invalid offset should throw an exception when iterating over response") + } catch { case e: OffsetOutOfRangeException => "this is good" } @@ -87,39 +91,42 @@ // send some messages val topic = "test" -// send an empty messageset first - val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(Seq.empty[Message]: _*)) + // send an empty messageset first + val sent2 = new ByteBufferMessageSet(DefaultCompressionCodec, getMessageList(Seq.empty[Message]: _*)) producer.send(topic, sent2) + Thread.sleep(200) sent2.getBuffer.rewind - var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent2.iterator, fetched2.iterator) + val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val message2 = fetched2.messageSet(topic, 0) + TestUtils.checkEquals(sent2.iterator, message2.iterator) // send some messages - val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) + val sent3 = new ByteBufferMessageSet( DefaultCompressionCodec, + getMessageList( + new Message("hello".getBytes()),new Message("there".getBytes()))) producer.send(topic, sent3) Thread.sleep(200) sent3.getBuffer.rewind - var fetched3: ByteBufferMessageSet = null - while(fetched3 == null || fetched3.validBytes == 0) - fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent3.iterator, fetched3.iterator) + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent3.iterator, fetchedMessage.iterator) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator - fail("expect exception") - } - catch { + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + val messageWithError = fetchedWithError.messageSet(topic, 0) + messageWithError.iterator + fail("Fetch with invalid offset should throw an exception when iterating over response") + } catch { case e: OffsetOutOfRangeException => "this is good" } @@ -129,31 +136,27 @@ def testProduceAndMultiFetch() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { - if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else - fail("fewer responses than expected") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val messageSet = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, messageSet.iterator) } } @@ -162,38 +165,42 @@ { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + try { + val iter = response.messageSet(topic, partition).iterator + while (iter.hasNext) + iter.next + fail("MessageSet for invalid offset should throw exception") + } catch { + case e: OffsetOutOfRangeException => "this is good" + } } - catch { - case e: OffsetOutOfRangeException => "this is good" - } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) { + try { + val iter = response.messageSet(topic, -1).iterator + while (iter.hasNext) + iter.next + fail("MessageSet for invalid partition should throw exception") + } catch { + case e: InvalidPartitionException => "this is good" + } } - catch { - case e: InvalidPartitionException => "this is good" - } } // restore set request handler logger to a higher level @@ -202,31 +209,31 @@ def testProduceAndMultiFetchWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { + fail("fewer responses than expected") } - else - fail("fewer responses than expected") } } @@ -235,38 +242,42 @@ { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + try { + val iter = response.messageSet(topic, partition).iterator + while (iter.hasNext) + iter.next + fail("Expected exception when fetching invalid offset") + } catch { + case e: OffsetOutOfRangeException => "this is good" + } } - catch { - case e: OffsetOutOfRangeException => "this is good" - } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) { + try { + val iter = response.messageSet(topic, -1).iterator + while (iter.hasNext) + iter.next + fail("Expected exception when fetching invalid partition") + } catch { + case e: InvalidPartitionException => "this is good" + } } - catch { - case e: InvalidPartitionException => "this is good" - } } // restore set request handler logger to a higher level @@ -275,129 +286,141 @@ def testProduceAndMultiFetchJava() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches.add(new FetchRequest(topic, 0, 0, 10000)) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { + fail("fewer responses than expected") } - else - fail("fewer responses than expected") } } } def testProduceAndMultiFetchJavaWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches.add(new FetchRequest(topic, 0, 0, 10000)) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches) - val iter = response.iterator - for(topic <- topics) { - if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else - fail("fewer responses than expected") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator + TestUtils.checkEquals(messages(topic).iterator, iter) } } } def testMultiProduce() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](topics.size) + var index = 0 + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set.underlying) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { + fail("fewer responses than expected") } - else - fail("fewer responses than expected") } } def testMultiProduceWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + val builder = new FetchRequestBuilder() + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[WiredTopic](topics.size) + var index = 0 + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(0,set.underlying) + data(index) = new WiredTopic(topic,partition_data) + index += 1 + builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { + fail("fewer responses than expected") } - else - fail("fewer responses than expected") } } @@ -407,9 +430,4 @@ messageList } - private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = { - val fetchReqs = new java.util.ArrayList[FetchRequest]() - fetches.foreach(f => fetchReqs.add(f)) - fetchReqs - } } Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducerConfig.scala (revision 1242817) +++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala (working copy) @@ -41,4 +41,13 @@ val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) + + /* the client application sending the producer requests */ + val client_id = Utils.getString(props,"producer.request.client_id","") + + /* the required_acks of the producer requests */ + val required_acks = Utils.getShort(props,"producer.request.required_acks",0) + + /* the ack_timeout of the producer requests */ + val ack_timeout = Utils.getInt(props,"producer.request.ack_timeout",1) } Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1242817) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -74,4 +74,5 @@ val producerRetries = Utils.getInt(props, "producer.num.retries", 3) val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5) + } Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1242817) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -46,33 +46,14 @@ debug("Instantiating Scala Sync Producer") - private def verifySendBuffer(buffer : ByteBuffer) = { +private def verifySendBuffer(buffer : ByteBuffer) = { if (logger.isTraceEnabled) { trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() - if (requestTypeId == RequestKeys.MultiProduce) { - try { - val request = MultiProducerRequest.readFrom(buffer) - for (produce <- request.produces) { - try { - for (messageAndOffset <- produce.messages) - if (!messageAndOffset.message.isValid) - trace("topic " + produce.topic + " is invalid") - } - catch { - case e: Throwable => - trace("error iterating messages ", e) - } - } - } - catch { - case e: Throwable => - trace("error verifying sendbuffer ", e) - } - } + val request = ProducerRequest.readFrom(buffer) + trace(request.toString) } } - /** * Common functionality for the public send methods */ @@ -107,21 +88,35 @@ /** * Send a message */ + def send(producerRequest: kafka.javaapi.ProducerRequest) { + producerRequest.data.foreach(d => { + d.partition_data.foreach(p => { + verifyMessageSize(p.messages) + val setSize = p.messages.sizeInBytes.asInstanceOf[Int] + trace("Got message set with " + setSize + " bytes to send") + }) + }) + send(new BoundedByteBufferSend(producerRequest)) + } + def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { - verifyMessageSize(messages) - val setSize = messages.sizeInBytes.asInstanceOf[Int] - trace("Got message set with " + setSize + " bytes to send") - send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) + send(-1,topic,partition,messages) } - - def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages) + + def send(topic: String, messages: ByteBufferMessageSet) { + send(-1,topic,ProducerRequest.RandomPartition,messages) + } - def multiSend(produces: Array[ProducerRequest]) { - for (request <- produces) - verifyMessageSize(request.messages) - val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) - trace("Got multi message sets with " + setSize + " bytes to send") - send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) + def send(correlation_id: Int, topic: String, partition: Int, messages: ByteBufferMessageSet) { + val client_id = config.client_id + val required_acks: Short = config.required_acks + val ack_timeout = config.ack_timeout + var data = new Array[WiredTopic](1) + var partition_data = new Array[WiredPartition](1) + partition_data(0) = new WiredPartition(partition,messages) + data(0) = new WiredTopic(topic,partition_data) + val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + send(producerRequest) } def close() = { Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1242817) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -17,7 +17,7 @@ package kafka.producer.async -import kafka.api.ProducerRequest +import kafka.api.{ProducerRequest, WiredTopic, WiredPartition} import kafka.serializer.Encoder import java.util.Properties import kafka.producer._ @@ -177,9 +177,22 @@ private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) { if(messagesPerTopic.size > 0) { - val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray + val topics = new HashMap[String, ListBuffer[WiredPartition]]() + val requests = messagesPerTopic.map(f => { + val topicName = f._1._1 + val partitionId = f._1._2 + val messagesSet= f._2 + val topic = topics.get(topicName) // checking to see if this topics exists + topic match { + case None => topics += topicName -> new ListBuffer[WiredPartition]() //create a new listbuffer for this topic + case Some(x) => trace("found " + topicName) + } + topics(topicName).append(new WiredPartition(partitionId, messagesSet)) + }) + val topic_data = topics.map(kv => new WiredTopic(kv._1,kv._2.toArray)) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, "", -1, -1, topic_data.toArray) //new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, topic_data.toArray) val syncProducer = producerPool.getProducer(brokerId) - syncProducer.multiSend(requests) + syncProducer.send(producerRequest) trace("kafka producer sent messages for topics %s to broker %s:%d" .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) } Index: core/src/main/scala/kafka/network/SocketServerStats.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServerStats.scala (revision 1242817) +++ core/src/main/scala/kafka/network/SocketServerStats.scala (working copy) @@ -50,7 +50,7 @@ requestTypeId match { case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce => produceTimeStats.recordRequestMetric(durationNs) - case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch => + case r if r == RequestKeys.Fetch => fetchTimeStats.recordRequestMetric(durationNs) case _ => /* not collecting; let go */ } Index: core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala =================================================================== --- core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (revision 1242817) +++ core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (working copy) @@ -28,7 +28,7 @@ @nonthreadsafe private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging { - private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) + private val sizeBuffer = ByteBuffer.allocate(4) private var contentBuffer: ByteBuffer = null def this() = this(Int.MaxValue) @@ -78,12 +78,10 @@ var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) - } - catch { - case e: OutOfMemoryError => { + } catch { + case e: OutOfMemoryError => error("OOME with size " + size, e) throw e - } case e2 => throw e2 } Index: core/src/main/scala/kafka/network/RequestChannel.scala =================================================================== --- core/src/main/scala/kafka/network/RequestChannel.scala (revision 1242817) +++ core/src/main/scala/kafka/network/RequestChannel.scala (working copy) @@ -21,8 +21,8 @@ object RequestChannel { val AllDone = new Request(1, 2, null, 0) - case class Request(val processor: Int, requestKey: Any, request: Receive, start: Long) - case class Response(val processor: Int, requestKey: Any, response: Send, start: Long, ellapsed: Long) + case class Request(processor: Int, requestKey: Any, request: Receive, start: Long) + case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long) } class RequestChannel(val numProcessors: Int, val queueSize: Int) { Index: core/src/main/scala/kafka/network/Transmission.scala =================================================================== --- core/src/main/scala/kafka/network/Transmission.scala (revision 1242817) +++ core/src/main/scala/kafka/network/Transmission.scala (working copy) @@ -66,14 +66,15 @@ trait Send extends Transmission { def writeTo(channel: GatheringByteChannel): Int - + def writeCompletely(channel: GatheringByteChannel): Int = { - var written = 0 + var totalWritten = 0 while(!complete) { - written = writeTo(channel) + val written = writeTo(channel) trace(written + " bytes written.") + totalWritten += written } - written + totalWritten } } @@ -99,9 +100,9 @@ if (current == Nil) { if (totalWritten != expectedBytesToWrite) error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) - return true + true + } else { + false } - else - return false } } Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1242817) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -20,7 +20,6 @@ import java.net._ import java.nio.channels._ import kafka.api._ -import kafka.message._ import kafka.network._ import kafka.utils._ @@ -72,7 +71,7 @@ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): ByteBufferMessageSet = { + def fetch(request: FetchRequest): FetchResponse = { lock synchronized { val startTime = SystemTime.nanoseconds getOrMakeConnection() @@ -88,51 +87,19 @@ channel = connect sendRequest(request) response = getResponse - }catch { + } catch { case ioe: java.io.IOException => channel = null; throw ioe; } case e => throw e } - val endTime = SystemTime.nanoseconds - SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit) - new ByteBufferMessageSet(response._1.buffer, request.offset, response._2) - } - } + val fetchResponse = FetchResponse.readFrom(response._1.buffer) + val fetchedSize = fetchResponse.sizeInBytes - /** - * Combine multiple fetch requests in one call. - * - * @param fetches a sequence of fetch requests. - * @return a sequence of fetch responses - */ - def multifetch(fetches: FetchRequest*): MultiFetchResponse = { - lock synchronized { - val startTime = SystemTime.nanoseconds - getOrMakeConnection() - var response: Tuple2[Receive,Int] = null - try { - sendRequest(new MultiFetchRequest(fetches.toArray)) - response = getResponse - } catch { - case e : java.io.IOException => - info("Reconnect in multifetch due to socket error: ", e) - // retry once - try { - channel = connect - sendRequest(new MultiFetchRequest(fetches.toArray)) - response = getResponse - }catch { - case ioe: java.io.IOException => channel = null; throw ioe; - } - case e => throw e - } val endTime = SystemTime.nanoseconds SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit) + SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) - // error code will be set on individual messageset inside MultiFetchResponse - new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset)) + fetchResponse } } @@ -158,7 +125,7 @@ channel = connect sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets)) response = getResponse - }catch { + } catch { case ioe: java.io.IOException => channel = null; throw ioe; } } Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1242817) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -17,13 +17,14 @@ package kafka.consumer +import java.io.IOException import java.util.concurrent.CountDownLatch +import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.cluster.{Partition, Broker} import kafka.common.ErrorMapping -import kafka.cluster.{Partition, Broker} -import kafka.api.{OffsetRequest, FetchRequest} +import kafka.message.ByteBufferMessageSet +import kafka.utils._ import org.I0Itec.zkclient.ZkClient -import kafka.utils._ -import java.io.IOException class FetcherRunnable(val name: String, val zkClient : ZkClient, @@ -50,18 +51,26 @@ info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: " + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port) + var reqId = 0 try { while (!stopped) { - val fetches = partitionTopicInfos.map(info => - new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize)) + // TODO: fix up the max wait and min bytes + val builder = new FetchRequestBuilder(). + correlationId(reqId). + clientId(config.consumerId.getOrElse(name)). + maxWait(0). + minBytes(0) + partitionTopicInfos.foreach(pti => + builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize) + ) - trace("fetch request: " + fetches.toString) + val fetchRequest = builder.build() + trace("fetch request: " + fetchRequest) + val response = simpleConsumer.fetch(fetchRequest) - val response = simpleConsumer.multifetch(fetches : _*) - var read = 0L - - for((messages, infopti) <- response.zip(partitionTopicInfos)) { + for(infopti <- partitionTopicInfos) { + val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet] try { var done = false if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { @@ -76,8 +85,7 @@ } if (!done) read += infopti.enqueue(messages, infopti.getFetchOffset) - } - catch { + } catch { case e1: IOException => // something is wrong with the socket, re-throw the exception to stop the fetcher throw e1 @@ -91,6 +99,7 @@ throw e2 } } + reqId = if(reqId == Int.MaxValue) 0 else reqId + 1 trace("fetched bytes: " + read) if(read == 0) { @@ -98,8 +107,7 @@ Thread.sleep(config.fetcherBackoffMs) } } - } - catch { + } catch { case e => if (stopped) info("FecherRunnable " + this + " interrupted") Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1242817) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -50,8 +50,7 @@ /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ - val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */ - if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None + val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null)) /** the socket timeout for network requests */ val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout) Index: core/src/main/scala/kafka/consumer/TopicCount.scala =================================================================== --- core/src/main/scala/kafka/consumer/TopicCount.scala (revision 1242817) +++ core/src/main/scala/kafka/consumer/TopicCount.scala (working copy) @@ -32,8 +32,7 @@ case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + jsonString) } - } - catch { + } catch { case e => error("error parsing consumer json string " + jsonString, e) throw e @@ -46,8 +45,7 @@ private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) { - def getConsumerThreadIdsPerTopic() - : Map[String, Set[String]] = { + def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = { val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() for ((topic, nConsumers) <- topicCountMap) { val consumerSet = new mutable.HashSet[String] Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1242817) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -105,8 +105,7 @@ def this(config: ConsumerConfig) = this(config, true) - def createMessageStreams[T](topicCountMap: Map[String,Int], - decoder: Decoder[T]) + def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T]) : Map[String,List[KafkaMessageStream[T]]] = { consume(topicCountMap, decoder) } @@ -138,8 +137,7 @@ zkClient.close() zkClient = null } - } - catch { + } catch { case e => fatal("error during consumer connector shutdown", e) } @@ -147,8 +145,7 @@ } } - def consume[T](topicCountMap: scala.collection.Map[String,Int], - decoder: Decoder[T]) + def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T]) : Map[String,List[KafkaMessageStream[T]]] = { debug("entering consume ") if (topicCountMap == null) @@ -159,13 +156,13 @@ var consumerUuid : String = null config.consumerId match { - case Some(consumerId) // for testing only - => consumerUuid = consumerId - case None // generate unique consumerId automatically - => val uuid = UUID.randomUUID() - consumerUuid = "%s-%d-%s".format( - InetAddress.getLocalHost.getHostName, System.currentTimeMillis, - uuid.getMostSignificantBits().toHexString.substring(0,8)) + case Some(consumerId) => // for testing only + consumerUuid = consumerId + case None => // generate unique consumerId automatically + val uuid = UUID.randomUUID() + consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, + System.currentTimeMillis, + uuid.getMostSignificantBits().toHexString.substring(0,8) ) } val consumerIdString = config.groupId + "_" + consumerUuid val topicCount = new TopicCount(consumerIdString, topicCountMap) @@ -243,8 +240,7 @@ try { updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name, newOffset.toString) - } - catch { + } catch { case t: Throwable => // log it and let it go warn("exception during commitOffsets", t) @@ -321,8 +317,7 @@ ConsumerConfig.SocketBufferSize) val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1) producedOffset = offsets(0) - } - catch { + } catch { case e => error("error in earliestOrLatestOffset() ", e) } @@ -419,8 +414,7 @@ val cluster = getCluster(zkClient) try { done = rebalance(cluster) - } - catch { + } catch { case e => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get @@ -433,7 +427,7 @@ info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { return - }else { + } else { /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") @@ -529,7 +523,7 @@ oldConsumersPerTopicMap = consumersPerTopicMap updateFetcher(cluster, kafkaMessageStreams) true - }else + } else false } @@ -611,8 +605,7 @@ createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) true - } - catch { + } catch { case e: ZkNodeExistsException => // The node hasn't been deleted by the original owner. So wait a bit and retry. info("waiting for the partition ownership to be deleted: " + partition) Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1242817) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -19,7 +19,7 @@ import java.net.URI import joptsimple._ -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.utils._ import kafka.consumer._ @@ -54,6 +54,11 @@ .describedAs("fetchsize") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000000) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withOptionalArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerShell") val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") .withOptionalArg .describedAs("print offsets") @@ -79,7 +84,8 @@ val topic = options.valueOf(topicOpt) val partition = options.valueOf(partitionOpt).intValue val startingOffset = options.valueOf(offsetOpt).longValue - val fetchsize = options.valueOf(fetchsizeOpt).intValue + val fetchSize = options.valueOf(fetchsizeOpt).intValue + val clientId = options.valueOf(clientIdOpt).toString val printOffsets = if(options.has(printOffsetOpt)) true else false val printMessages = if(options.has(printMessageOpt)) true else false @@ -87,22 +93,27 @@ val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024) val thread = Utils.newThread("kafka-consumer", new Runnable() { def run() { + var reqId = 0 var offset = startingOffset while(true) { - val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize) - val messageSets = consumer.multifetch(fetchRequest) - for (messages <- messageSets) { - debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) - var consumed = 0 - for(messageAndOffset <- messages) { - if(printMessages) - info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset = messageAndOffset.offset - if(printOffsets) - info("next offset = " + offset) - consumed += 1 - } + val fetchRequest = new FetchRequestBuilder() + .correlationId(reqId) + .clientId(clientId) + .addFetch(topic, partition, offset, fetchSize) + .build() + val fetchResponse = consumer.fetch(fetchRequest) + val messageSet = fetchResponse.messageSet(topic, partition) + debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) + var consumed = 0 + for(messageAndOffset <- messageSet) { + if(printMessages) + info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + offset = messageAndOffset.offset + if(printOffsets) + info("next offset = " + offset) + consumed += 1 } + reqId += 1 } } }, false); Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1242817) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -194,6 +194,9 @@ def getInt(props: Properties, name: String, default: Int): Int = getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) + def getShort(props: Properties, name: String, default: Short): Short = + getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue)) + /** * Read an integer from the properties instance. Throw an exception * if the value is not in the given range (inclusive) @@ -216,6 +219,18 @@ v } + def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = { + val v = + if(props.containsKey(name)) + props.getProperty(name).toShort + else + default + if(v < range._1 || v > range._2) + throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".") + else + v + } + def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { val value = buffer.getInt if(value < range._1 || value > range._2) Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala (revision 1242817) +++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala (working copy) @@ -35,9 +35,9 @@ case Some(send) => { val resp = new RequestChannel.Response(processor = req.processor, requestKey = req.requestKey, - response = send, - start = req.start, - ellapsed = -1) + response = send, + start = req.start, + elapsed = -1) requestChannel.sendResponse(resp) trace("Processor " + Thread.currentThread.getName + " sent response " + resp) } Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1242817) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -17,17 +17,17 @@ package kafka.server -import org.apache.log4j.Logger +import java.io.IOException +import java.lang.IllegalStateException +import kafka.admin.{CreateTopicCommand, AdminUtils} +import kafka.api._ +import kafka.common.ErrorMapping import kafka.log._ +import kafka.message._ import kafka.network._ -import kafka.message._ -import kafka.api._ -import kafka.common.ErrorMapping -import java.io.IOException import kafka.utils.{SystemTime, Logging} -import collection.mutable.ListBuffer -import kafka.admin.{CreateTopicCommand, AdminUtils} -import java.lang.IllegalStateException +import org.apache.log4j.Logger +import scala.collection.mutable.ListBuffer /** * Logic to handle the various Kafka requests @@ -39,13 +39,11 @@ def handle(receive: Receive): Option[Send] = { val apiId = receive.buffer.getShort() apiId match { - case RequestKeys.Produce => handleProducerRequest(receive) - case RequestKeys.Fetch => handleFetchRequest(receive) - case RequestKeys.MultiFetch => handleMultiFetchRequest(receive) - case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) - case RequestKeys.Offsets => handleOffsetRequest(receive) - case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) - case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) + case RequestKeys.Produce => handleProducerRequest(receive) + case RequestKeys.Fetch => handleFetchRequest(receive) + case RequestKeys.Offsets => handleOffsetRequest(receive) + case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) + case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) } } @@ -60,66 +58,72 @@ None } - def handleMultiProducerRequest(receive: Receive): Option[Send] = { - val request = MultiProducerRequest.readFrom(receive.buffer) - if(requestLogger.isTraceEnabled) - requestLogger.trace("Multiproducer request " + request.toString) - request.produces.map(handleProducerRequest(_, "MultiProducerRequest")) - None - } - - private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = { - val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) - try { - logManager.getOrCreateLog(request.topic, partition).append(request.messages) - trace(request.messages.sizeInBytes + " bytes written to logs.") - } - catch { - case e => - error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - System.exit(1) - case _ => + private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): ProducerResponse = { + val requestSize = request.data.size + val errors = new Array[Int](requestSize) + val offsets = new Array[Long](requestSize) + + request.data.foreach(d => { + d.partition_data.foreach(p => { + val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition) + try { + logManager.getOrCreateLog(d.topic, partition).append(p.messages) + trace(p.messages.sizeInBytes + " bytes written to logs.") + p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) } - throw e - } - None + catch { + case e => + //TODO: handle response in ProducerResponse + error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e) + e match { + case _: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + Runtime.getRuntime.halt(1) + case _ => + } + //throw e + } + }) + //None + }) + new ProducerResponse(request.correlation_id, request.version_id, errors, offsets) } def handleFetchRequest(request: Receive): Option[Send] = { val fetchRequest = FetchRequest.readFrom(request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Fetch request " + fetchRequest.toString) - Some(readMessageSet(fetchRequest)) + + val fetchedData = new ListBuffer[TopicData]() + var error: Int = ErrorMapping.NoError + + for(offsetDetail <- fetchRequest.offsetInfo) { + val info = new ListBuffer[PartitionData]() + val topic = offsetDetail.topic + val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) + for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + case Left(err) => error = err; new PartitionData(partition, err, offset, MessageSet.Empty) + case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages) + } + info.append(partitionInfo) + } + fetchedData.append(new TopicData(topic, info.toArray)) + } + val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, fetchedData.toArray ) + Some(new FetchResponseSend(response, error)) } - - def handleMultiFetchRequest(request: Receive): Option[Send] = { - val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer) - if(requestLogger.isTraceEnabled) - requestLogger.trace("Multifetch request") - multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString)) - var responses = multiFetchRequest.fetches.map(fetch => - readMessageSet(fetch)).toList - - Some(new MultiMessageSetSend(responses)) - } - private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = { - var response: MessageSetSend = null + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = { + var response: Either[Int, MessageSet] = null try { - trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) - val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition) - if (log != null) - response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) - else - response = new MessageSetSend() - } - catch { + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + val log = logManager.getLog(topic, partition) + response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty) + } catch { case e => - error("error when processing request " + fetchRequest, e) - response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + error("error when processing request " + (topic, partition, offset, maxSize), e) + response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } response } Index: core/src/main/scala/kafka/api/MultiFetchRequest.scala =================================================================== --- core/src/main/scala/kafka/api/MultiFetchRequest.scala (revision 1242817) +++ core/src/main/scala/kafka/api/MultiFetchRequest.scala (working copy) @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio._ -import kafka.network._ - -object MultiFetchRequest { - def readFrom(buffer: ByteBuffer): MultiFetchRequest = { - val count = buffer.getShort - val fetches = new Array[FetchRequest](count) - for(i <- 0 until fetches.length) - fetches(i) = FetchRequest.readFrom(buffer) - new MultiFetchRequest(fetches) - } -} - -class MultiFetchRequest(val fetches: Array[FetchRequest]) extends Request(RequestKeys.MultiFetch) { - def writeTo(buffer: ByteBuffer) { - if(fetches.length > Short.MaxValue) - throw new IllegalArgumentException("Number of requests in MultiFetchRequest exceeds " + Short.MaxValue + ".") - buffer.putShort(fetches.length.toShort) - for(fetch <- fetches) - fetch.writeTo(buffer) - } - - def sizeInBytes: Int = { - var size = 2 - for(fetch <- fetches) - size += fetch.sizeInBytes - size - } - - - override def toString(): String = { - val buffer = new StringBuffer - for(fetch <- fetches) { - buffer.append(fetch.toString) - buffer.append(",") - } - buffer.toString - } -} Index: core/src/main/scala/kafka/api/FetchRequest.scala =================================================================== --- core/src/main/scala/kafka/api/FetchRequest.scala (revision 1242817) +++ core/src/main/scala/kafka/api/FetchRequest.scala (working copy) @@ -20,32 +20,149 @@ import java.nio._ import kafka.network._ import kafka.utils._ +import scala.collection.mutable.{HashMap, Buffer, ListBuffer} +object OffsetDetail { + + def readFrom(buffer: ByteBuffer): OffsetDetail = { + val topic = Utils.readShortString(buffer, "UTF-8") + + val partitionsCount = buffer.getInt + val partitions = new Array[Int](partitionsCount) + for (i <- 0 until partitions.length) + partitions(i) = buffer.getInt + + val offsetsCount = buffer.getInt + val offsets = new Array[Long](offsetsCount) + for (i <- 0 until offsets.length) + offsets(i) = buffer.getLong + + val fetchesCount = buffer.getInt + val fetchSizes = new Array[Int](fetchesCount) + for (i <- 0 until fetchSizes.length) + fetchSizes(i) = buffer.getInt + + new OffsetDetail(topic, partitions, offsets, fetchSizes) + } + +} + +case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) { + + def writeTo(buffer: ByteBuffer) { + Utils.writeShortString(buffer, topic, "UTF-8") + + if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue) + throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".") + + buffer.putInt(partitions.length) + partitions.foreach(buffer.putInt(_)) + + buffer.putInt(offsets.length) + offsets.foreach(buffer.putLong(_)) + + buffer.putInt(fetchSizes.length) + fetchSizes.foreach(buffer.putInt(_)) + } + + def sizeInBytes(): Int = { + 2 + topic.length() + // topic string + partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int) + offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long) + fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size + } +} + object FetchRequest { - + val CurrentVersion = 1.shortValue() + def readFrom(buffer: ByteBuffer): FetchRequest = { - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt() - val offset = buffer.getLong() - val size = buffer.getInt() - new FetchRequest(topic, partition, offset, size) + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = Utils.readShortString(buffer, "UTF-8") + val replicaId = buffer.getInt + val maxWait = buffer.getInt + val minBytes = buffer.getInt + val offsetsCount = buffer.getInt + val offsetInfo = new Array[OffsetDetail](offsetsCount) + for(i <- 0 until offsetInfo.length) + offsetInfo(i) = OffsetDetail.readFrom(buffer) + + new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo) } + } -class FetchRequest(val topic: String, - val partition: Int, - val offset: Long, - val maxSize: Int) extends Request(RequestKeys.Fetch) { - +case class FetchRequest( versionId: Short, + correlationId: Int, + clientId: String, + replicaId: Int, + maxWait: Int, + minBytes: Int, + offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) { + def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putLong(offset) - buffer.putInt(maxSize) + buffer.putShort(versionId) + buffer.putInt(correlationId) + Utils.writeShortString(buffer, clientId, "UTF-8") + buffer.putInt(replicaId) + buffer.putInt(maxWait) + buffer.putInt(minBytes) + buffer.putInt(offsetInfo.size) + for(topicDetail <- offsetInfo) { + topicDetail.writeTo(buffer) + } } - - def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4 - override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset + - " maxSize:" + maxSize + ")" + def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) } + +class FetchRequestBuilder() { + private var correlationId = -1 + private val versionId = FetchRequest.CurrentVersion + private var clientId = "" + private var replicaId = -1 // sensible default + private var maxWait = -1 // sensible default + private var minBytes = -1 // sensible default + private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]] + + def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { + val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]())) + topicData._1.append(partition) + topicData._2.append(offset) + topicData._3.append(fetchSize) + this + } + + def correlationId(correlationId: Int): FetchRequestBuilder = { + this.correlationId = correlationId + this + } + + def clientId(clientId: String): FetchRequestBuilder = { + this.clientId = clientId + this + } + + def replicaId(replicaId: Int): FetchRequestBuilder = { + this.replicaId = replicaId + this + } + + def maxWait(maxWait: Int): FetchRequestBuilder = { + this.maxWait = maxWait + this + } + + def minBytes(minBytes: Int): FetchRequestBuilder = { + this.minBytes = minBytes + this + } + + def build() = { + val offsetDetails = requestMap.map{ topicData => + new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray) + } + new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail]) + } +} Index: core/src/main/scala/kafka/api/ProducerResponse.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerResponse.scala (revision 0) +++ core/src/main/scala/kafka/api/ProducerResponse.scala (revision 0) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio._ +import java.nio.channels._ +import kafka.network._ +import kafka.message._ +import kafka.utils._ +import kafka.common.ErrorMapping + +@nonthreadsafe +class ProducerResponse(val correlation_id: Int, val version_id: Short, val errors: Array[Int], val offsets: Array[Long]) extends Send { + + val sizeInBytes = 4 + 2 + 4 + (4 * errors.size) + 4 + (8 * offsets.size) + + private val buffer = ByteBuffer.allocate(sizeInBytes) + buffer.putInt(correlation_id) + buffer.putShort(version_id) + buffer.putInt(errors.size) + errors.foreach(e => buffer.putInt(e)) + buffer.putInt(offsets.size) + offsets.foreach(o => buffer.putLong(o)) + + var complete: Boolean = false + + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + written += channel.write(buffer) + if(!buffer.hasRemaining) + complete = true + written + } +} \ No newline at end of file Index: core/src/main/scala/kafka/api/RequestKeys.scala =================================================================== --- core/src/main/scala/kafka/api/RequestKeys.scala (revision 1242817) +++ core/src/main/scala/kafka/api/RequestKeys.scala (working copy) @@ -20,8 +20,7 @@ object RequestKeys { val Produce: Short = 0 val Fetch: Short = 1 - val MultiFetch: Short = 2 - val MultiProduce: Short = 3 - val Offsets: Short = 4 - val TopicMetadata: Short = 5 + val MultiProduce: Short = 2 + val Offsets: Short = 3 + val TopicMetadata: Short = 4 } Index: core/src/main/scala/kafka/api/TopicData.scala =================================================================== --- core/src/main/scala/kafka/api/TopicData.scala (revision 0) +++ core/src/main/scala/kafka/api/TopicData.scala (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.utils.Utils +import kafka.common.ErrorMapping +import kafka.message.{MessageSet, ByteBufferMessageSet} + +object PartitionData { + def readFrom(buffer: ByteBuffer): PartitionData = { + val partition = buffer.getInt + val error = buffer.getInt + val initialOffset = buffer.getLong + val messageSetSize = buffer.getInt + val messageSetBuffer = buffer.slice() + messageSetBuffer.limit(messageSetSize) + buffer.position(buffer.position + messageSetSize) + new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error)) + } +} + +case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { + def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages) + + val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() +} + +object TopicData { + + def readFrom(buffer: ByteBuffer): TopicData = { + val topic = Utils.readShortString(buffer, "UTF-8") + val partitionCount = buffer.getInt + val partitions = new Array[PartitionData](partitionCount) + for(i <- 0 until partitions.length) + partitions(i) = PartitionData.readFrom(buffer) + new TopicData(topic, partitions.sortBy(_.partition)) + } + + def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = { + if(data == null || data.size == 0) + return None + + var (low, high) = (0, data.size-1) + while(low <= high) { + val mid = (low + high) / 2 + val found = data(mid) + if(found.partition == partition) + return Some(found) + else if(partition < found.partition) + high = mid - 1 + else + low = mid + 1 + } + None + } +} + +case class TopicData(topic: String, partitionData: Array[PartitionData]) { + val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes) +} \ No newline at end of file Index: core/src/main/scala/kafka/api/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/api/FetchResponse.scala (revision 0) +++ core/src/main/scala/kafka/api/FetchResponse.scala (revision 0) @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import java.nio.channels.GatheringByteChannel +import kafka.common.ErrorMapping +import kafka.message.{MessageSet, ByteBufferMessageSet} +import kafka.network.{MultiSend, Send} +import kafka.utils.Utils + +object FetchResponse { + def readFrom(buffer: ByteBuffer): FetchResponse = { + val versionId = buffer.getShort + val correlationId = buffer.getInt + val dataCount = buffer.getInt + val data = new Array[TopicData](dataCount) + for(i <- 0 until data.length) + data(i) = TopicData.readFrom(buffer) + new FetchResponse(versionId, correlationId, data) + } +} + +case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData]) { + + val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes) + + lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) + + def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { + val messageSet = topicMap.get(topic) match { + case Some(topicData) => + TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty) + case None => + MessageSet.Empty + } + messageSet.asInstanceOf[ByteBufferMessageSet] + } +} + +// SENDS +class PartitionDataSend(val partitionData: PartitionData) extends Send { + private val messageSize = partitionData.messages.sizeInBytes + private var messagesSentSize = 0L + + private val buffer = ByteBuffer.allocate(20) + buffer.putInt(partitionData.partition) + buffer.putInt(partitionData.error) + buffer.putLong(partitionData.initialOffset) + buffer.putInt(partitionData.messages.sizeInBytes.intValue()) + buffer.rewind() + + def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + + def writeTo(channel: GatheringByteChannel): Int = { + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && messagesSentSize < messageSize) { + val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt + messagesSentSize += bytesSent + written += bytesSent + } + written + } +} + +class TopicDataSend(val topicData: TopicData) extends Send { + val size = topicData.sizeInBytes + + var sent = 0 + + private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4) + Utils.writeShortString(buffer, topicData.topic, "UTF-8") + buffer.putInt(topicData.partitionData.length) + buffer.rewind() + + val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) { + val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes) + } + + def complete = sent >= size + + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && !sends.complete) { + written += sends.writeCompletely(channel) + } + sent += written + written + } +} + +class FetchResponseSend(val fetchResponse: FetchResponse, + val errorCode: Int = ErrorMapping.NoError) extends Send { + + private val size = fetchResponse.sizeInBytes + + private var sent = 0 + + private val buffer = ByteBuffer.allocate(16) + buffer.putInt(size + 2) + buffer.putShort(errorCode.shortValue()) + buffer.putShort(fetchResponse.versionId) + buffer.putInt(fetchResponse.correlationId) + buffer.putInt(fetchResponse.data.length) + buffer.rewind() + + val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) { + val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes) + } + + def complete = sent >= sendSize + + def writeTo(channel: GatheringByteChannel):Int = { + expectIncomplete() + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && !sends.complete) { + written += sends.writeCompletely(channel) + } + sent += written + written + } + + def sendSize = 4 + 2 + fetchResponse.sizeInBytes + +} Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1242817) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -22,47 +22,112 @@ import kafka.network._ import kafka.utils._ +object WiredFormat { + val version_id: Short = 0 +} + +class WiredTopic(val topic: String, val partition_data: Array[WiredPartition]) + +class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) { + def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { + if (partition == ProducerRequest.RandomPartition) + return randomSelector(topic) + else + return partition + } +} + object ProducerRequest { val RandomPartition = -1 - + def readFrom(buffer: ByteBuffer): ProducerRequest = { - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt - val messageSetSize = buffer.getInt - val messageSetBuffer = buffer.slice() - messageSetBuffer.limit(messageSetSize) - buffer.position(buffer.position + messageSetSize) - new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) + val version_id: Short = buffer.getShort + val correlation_id: Int = buffer.getInt + val client_id: String = Utils.readShortString(buffer, "UTF-8") + val required_acks: Short = buffer.getShort + val ack_timeout: Int = buffer.getInt + //build the topic structure + val topicCount = buffer.getInt + val data = new Array[WiredTopic](topicCount) + for(i <- 0 until topicCount) { + val topic = Utils.readShortString(buffer, "UTF-8") + + val partitionCount = buffer.getInt + //build the partition structure within this topic + val partition_data = new Array[WiredPartition](partitionCount) + for (j <- 0 until partitionCount) { + val partition = buffer.getInt + val messageSetSize = buffer.getInt + val messageSetBuffer = new Array[Byte](messageSetSize) + buffer.get(messageSetBuffer,0,messageSetSize) + partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) + } + data(i) = new WiredTopic(topic,partition_data) + } + new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data) } } -class ProducerRequest(val topic: String, - val partition: Int, - val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { +class ProducerRequest(val correlation_id: Int, + val client_id: String, + val required_acks: Short, + val ack_timeout: Int, + val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { + val version_id: Short = WiredFormat.version_id + def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putInt(messages.serialized.limit) - buffer.put(messages.serialized) - messages.serialized.rewind + buffer.putShort(version_id) + buffer.putInt(correlation_id) + Utils.writeShortString(buffer, client_id, "UTF-8") + buffer.putShort(required_acks) + buffer.putInt(ack_timeout) + //save the topic structure + buffer.putInt(data.size) //the number of topics + data.foreach(d =>{ + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic + buffer.putInt(d.partition_data.size) //the number of partitions + d.partition_data.foreach(p => { + buffer.putInt(p.partition) + buffer.putInt(p.messages.serialized.limit) + buffer.put(p.messages.serialized) + p.messages.serialized.rewind + }) + }) } - - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] - def getTranslatedPartition(randomSelector: String => Int): Int = { - if (partition == ProducerRequest.RandomPartition) - return randomSelector(topic) - else - return partition + def sizeInBytes(): Int = { + var size = 0 + //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size + size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4; + data.foreach(d =>{ + size += 2 + d.topic.length + 4 + d.partition_data.foreach(p => { + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] + }) + }) + size } + override def toString: String = { val builder = new StringBuilder() builder.append("ProducerRequest(") - builder.append(topic + ",") - builder.append(partition + ",") - builder.append(messages.sizeInBytes) + builder.append(version_id + ",") + builder.append(correlation_id + ",") + builder.append(client_id + ",") + builder.append(required_acks + ",") + builder.append(ack_timeout) + data.foreach(d =>{ + builder.append(":[" + d.topic) + d.partition_data.foreach(p => { + builder.append(":[") + builder.append(p.partition + ",") + builder.append(p.messages.sizeInBytes) + builder.append("]") + }) + builder.append("]") + }) builder.append(")") builder.toString } @@ -70,14 +135,36 @@ override def equals(other: Any): Boolean = { other match { case that: ProducerRequest => - (that canEqual this) && topic == that.topic && partition == that.partition && - messages.equals(that.messages) + if (that canEqual this) + if (version_id == that.version_id && correlation_id == that.correlation_id && + client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) { + for(i <- 0 until data.size) { + if (data(i).topic != that.data(i).topic) + return false + for(j <- 0 until data(i).partition_data.size) + if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages)) + return false + } + true + } + false case _ => false } } def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode - + override def hashCode: Int = { + def hcp(num: Int): Int = { + 31 + (17 * num) + } + var hash = hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout) + data.foreach(d =>{ + hash += d.topic.hashCode + d.partition_data.foreach(p => { + hash += hcp(p.partition) + p.messages.hashCode + }) + }) + hash + } } Index: core/src/main/scala/kafka/javaapi/Implicits.scala =================================================================== --- core/src/main/scala/kafka/javaapi/Implicits.scala (revision 1242817) +++ core/src/main/scala/kafka/javaapi/Implicits.scala (working copy) @@ -28,9 +28,6 @@ messageSet.getErrorCode) } - implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse = - response.underlying - - implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse = - new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets) + implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse = + new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data) } Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1242817) +++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy) @@ -18,6 +18,7 @@ import kafka.producer.SyncProducerConfig import kafka.javaapi.message.ByteBufferMessageSet +import kafka.javaapi.ProducerRequest class SyncProducer(syncProducer: kafka.producer.SyncProducer) { @@ -30,18 +31,11 @@ underlying.send(topic, partition, messages) } - def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, - kafka.api.ProducerRequest.RandomPartition, - messages) + def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, kafka.api.ProducerRequest.RandomPartition, messages) - def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { - import kafka.javaapi.Implicits._ - val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) - for(i <- 0 until produces.length) - produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) - underlying.multiSend(produceRequests) + def send(producerRequest: ProducerRequest) { + underlying.send(producerRequest) } - def close() { underlying.close } Index: core/src/main/scala/kafka/javaapi/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/javaapi/FetchResponse.scala (revision 0) +++ core/src/main/scala/kafka/javaapi/FetchResponse.scala (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.api.TopicData + + +class FetchResponse( val versionId: Short, + val correlationId: Int, + val data: Array[TopicData] ) { + + private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data) + + def messageSet(topic: String, partition: Int): kafka.javaapi.message.MessageSet = { + import Implicits._ + underlying.messageSet(topic, partition) + } +} Index: core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (revision 1242817) +++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (working copy) @@ -17,10 +17,9 @@ package kafka.javaapi.consumer -import kafka.utils.threadsafe -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.javaapi.MultiFetchResponse import kafka.api.FetchRequest +import kafka.javaapi.FetchResponse +import kafka.utils.threadsafe /** * A consumer of kafka messages @@ -38,24 +37,12 @@ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): ByteBufferMessageSet = { + def fetch(request: FetchRequest): FetchResponse = { import kafka.javaapi.Implicits._ underlying.fetch(request) } /** - * Combine multiple fetch requests in one call. - * - * @param fetches a sequence of fetch requests. - * @return a sequence of fetch responses - */ - def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = { - import scala.collection.JavaConversions._ - import kafka.javaapi.Implicits._ - underlying.multifetch(asBuffer(fetches): _*) - } - - /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1242817) +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) @@ -17,36 +17,31 @@ package kafka.javaapi import kafka.network.Request -import kafka.api.RequestKeys +import kafka.api.{RequestKeys, WiredTopic} import java.nio.ByteBuffer -class ProducerRequest(val topic: String, - val partition: Int, - val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { +class ProducerRequest(val correlation_id: Int, + val client_id: String, + val required_acks: Short, + val ack_timeout: Int, + val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { + import Implicits._ - private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) + private val underlying = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + def api(): kafka.api.ProducerRequest = underlying + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } def sizeInBytes(): Int = underlying.sizeInBytes - def getTranslatedPartition(randomSelector: String => Int): Int = - underlying.getTranslatedPartition(randomSelector) - override def toString: String = underlying.toString - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - (that canEqual this) && topic == that.topic && partition == that.partition && - messages.equals(that.messages) - case _ => false - } - } + override def equals(other: Any): Boolean = underlying.equals(other) def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode + override def hashCode: Int = underlying.hashCode -} +} \ No newline at end of file Index: perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (revision 1242817) +++ perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (working copy) @@ -18,13 +18,12 @@ package kafka.perf import java.net.URI -import joptsimple._ +import java.text.SimpleDateFormat +import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.consumer.SimpleConsumer import kafka.utils._ -import kafka.server._ -import kafka.consumer.SimpleConsumer import org.apache.log4j.Logger -import kafka.api.{OffsetRequest, FetchRequest} -import java.text.SimpleDateFormat +import kafka.message.ByteBufferMessageSet /** * Performance test for the simple consumer @@ -56,12 +55,20 @@ var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L + var reqId = 0 while(!done) { - val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize)) + // TODO: add in the maxWait and minBytes for performance + val request = new FetchRequestBuilder() + .correlationId(reqId) + .clientId(config.clientId) + .addFetch(config.topic, config.partition, offset, config.fetchSize) + .build() + val fetchResponse = consumer.fetch(request) + var messagesRead = 0 var bytesRead = 0 - - for(message <- messages) { + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { messagesRead += 1 bytesRead += message.message.payloadSize } @@ -69,7 +76,8 @@ if(messagesRead == 0 || totalMessagesRead > config.numMessages) done = true else - offset += messages.validBytes + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes totalBytesRead += bytesRead totalMessagesRead += messagesRead @@ -89,6 +97,7 @@ lastMessagesRead = totalMessagesRead consumedInterval = 0 } + reqId += 1 } val reportTime = System.currentTimeMillis val elapsed = (reportTime - startMs) / 1000.0 @@ -119,6 +128,11 @@ .describedAs("bytes") .ofType(classOf[java.lang.Integer]) .defaultsTo(1024*1024) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withOptionalArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerPerformanceClient") val options = parser.parse(args : _*) @@ -139,5 +153,6 @@ val showDetailedStats = options.has(showDetailedStatsOpt) val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) + val clientId = options.valueOf(clientIdOpt).toString } } Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (revision 1242817) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (working copy) @@ -16,27 +16,26 @@ */ package kafka.etl; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.zip.CRC32; import kafka.api.FetchRequest; -import kafka.javaapi.MultiFetchResponse; +import kafka.api.FetchRequestBuilder; import kafka.api.OffsetRequest; import kafka.common.ErrorMapping; +import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; +import kafka.javaapi.message.MessageSet; import kafka.message.MessageAndOffset; -import kafka.message.MessageSet; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.MultipleOutputs; + +import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; @SuppressWarnings({ "deprecation"}) public class KafkaETLContext { @@ -59,7 +58,8 @@ protected long _offset = Long.MAX_VALUE; /*current offset*/ protected long _count; /*current count*/ - protected MultiFetchResponse _response = null; /*fetch response*/ + protected int requestId = 0; /* the id of the next fetch request */ + protected FetchResponse _response = null; /*fetch response*/ protected Iterator _messageIt = null; /*message iterator*/ protected Iterator _respIterator = null; protected int _retry = 0; @@ -149,15 +149,19 @@ public boolean fetchMore () throws IOException { if (!hasMore()) return false; - FetchRequest fetchRequest = - new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize); - List array = new ArrayList(); - array.add(fetchRequest); + FetchRequest fetchRequest = new FetchRequestBuilder() + .correlationId(requestId) + .clientId(_request.clientId()) + .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize) + .build(); long tempTime = System.currentTimeMillis(); - _response = _consumer.multifetch(array); - if(_response != null) - _respIterator = _response.iterator(); + _response = _consumer.fetch(fetchRequest); + if(_response != null) { + _respIterator = new ArrayList(){{ + add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition())); + }}.iterator(); + } _requestTime += (System.currentTimeMillis() - tempTime); return true; Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java (revision 1242817) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java (working copy) @@ -29,6 +29,7 @@ URI _uri; int _partition; long _offset = DEFAULT_OFFSET; + String _clientId = "KafkaHadoopETL"; public KafkaETLRequest() { @@ -83,11 +84,11 @@ _offset = offset; } - public String getTopic() { return _topic;} - public URI getURI () { return _uri;} - public int getPartition() { return _partition;} - - public long getOffset() { return _offset;} + public String getTopic() { return _topic; } + public URI getURI () { return _uri; } + public int getPartition() { return _partition; } + public long getOffset() { return _offset; } + public String clientId() { return _clientId; } public boolean isValidOffset() { return _offset >= 0; Index: examples/src/main/java/kafka/examples/KafkaProperties.java =================================================================== --- examples/src/main/java/kafka/examples/KafkaProperties.java (revision 1242817) +++ examples/src/main/java/kafka/examples/KafkaProperties.java (working copy) @@ -28,4 +28,5 @@ final static int reconnectInterval = 10000; final static String topic2 = "topic2"; final static String topic3 = "topic3"; + final static String clientId = "SimpleConsumerDemoClient"; } Index: examples/src/main/java/kafka/examples/SimpleConsumerDemo.java =================================================================== --- examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (revision 1242817) +++ examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (working copy) @@ -16,71 +16,76 @@ */ package kafka.examples; -import java.util.ArrayList; -import java.util.List; - -import kafka.javaapi.MultiFetchResponse; +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.javaapi.message.MessageSet; import kafka.message.MessageAndOffset; -import scala.collection.Iterator; -import kafka.api.FetchRequest; -import kafka.message.Message; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -public class SimpleConsumerDemo -{ - private static void printMessages(ByteBufferMessageSet messageSet) - { +public class SimpleConsumerDemo { + + private static void printMessages(ByteBufferMessageSet messageSet) { for (MessageAndOffset messageAndOffset : messageSet) { System.out.println(ExampleUtils.getMessage(messageAndOffset.message())); } } - private static void generateData() - { + private static void generateData() { Producer producer2 = new Producer(KafkaProperties.topic2); producer2.start(); Producer producer3 = new Producer(KafkaProperties.topic3); producer3.start(); - try - { + try { Thread.sleep(1000); - } - catch (InterruptedException e) - { + } catch (InterruptedException e) { e.printStackTrace(); } } - public static void main(String[] args) - { - + public static void main(String[] args) { generateData(); + SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL, KafkaProperties.kafkaServerPort, KafkaProperties.connectionTimeOut, KafkaProperties.kafkaProducerBufferSize); System.out.println("Testing single fetch"); - FetchRequest req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100); - ByteBufferMessageSet messageSet = simpleConsumer.fetch(req); - printMessages(messageSet); + FetchRequest req = new FetchRequestBuilder() + .correlationId(0) + .clientId(KafkaProperties.clientId) + .addFetch(KafkaProperties.topic2, 0, 0L, 100) + .build(); + FetchResponse fetchResponse = simpleConsumer.fetch(req); + printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); - req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100); - List list = new ArrayList(); - list.add(req); - req = new FetchRequest(KafkaProperties.topic3, 0, 0L, 100); - list.add(req); - MultiFetchResponse response = simpleConsumer.multifetch(list); + Map> topicMap = new HashMap>() {{ + put(KafkaProperties.topic2, new ArrayList(){{ add(0); }}); + put(KafkaProperties.topic3, new ArrayList(){{ add(0); }}); + }}; + req = new FetchRequestBuilder() + .correlationId(0) + .clientId(KafkaProperties.clientId) + .addFetch(KafkaProperties.topic2, 0, 0L, 100) + .addFetch(KafkaProperties.topic3, 0, 0L, 100) + .build(); + fetchResponse = simpleConsumer.fetch(req); int fetchReq = 0; - for (ByteBufferMessageSet resMessageSet : response ) - { - System.out.println("Response from fetch request no: " + ++fetchReq); - printMessages(resMessageSet); + for ( Map.Entry> entry : topicMap.entrySet() ) { + String topic = entry.getKey(); + for ( Integer offset : entry.getValue()) { + System.out.println("Response from fetch request no: " + ++fetchReq); + printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset)); + } } } - }