Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1366075) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -32,7 +32,7 @@ import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.CreateTopicCommand -import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} +import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} /** * End to end tests of the primitive apis against a local server @@ -93,6 +93,13 @@ } } + def testEmptyFetchRequest() { + val offsets = Array[OffsetDetail]() + val request = new FetchRequest(offsetInfo = offsets) + val fetched = consumer.fetch(request) + assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0) + } + def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" val props = new Properties() Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1366075) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -28,6 +28,7 @@ import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite +import kafka.api.TopicData class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); @@ -71,6 +72,27 @@ } @Test + def testEmptyProduceRequest() { + val server = servers.head + val props = new Properties() + props.put("host", "localhost") + props.put("port", server.socketServer.port.toString) + props.put("buffer.size", "102400") + props.put("connect.timeout.ms", "300") + props.put("reconnect.interval", "500") + props.put("max.message.size", "100") + val correlationId = SyncProducerConfig.DefaultCorrelationId + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack = SyncProducerConfig.DefaultRequiredAcks + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + + val producer = new SyncProducer(new SyncProducerConfig(props)) + val response = producer.send(emptyRequest) + Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0) + } + + @Test def testSingleMessageSizeTooLarge() { val server = servers.head val props = new Properties() Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1366075) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -117,7 +117,9 @@ val response = produceToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1) { + if (produceRequest.requiredAcks == 0 || + produceRequest.requiredAcks == 1 || + produceRequest.data.size <= 0) { requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) for (topicData <- produceRequest.data) @@ -230,7 +232,9 @@ // if there are enough bytes available right now we can answer the request, otherwise we have to punt val availableBytes = availableFetchBytes(fetchRequest) - if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) { + if(fetchRequest.maxWait <= 0 || + availableBytes >= fetchRequest.minBytes || + fetchRequest.nRequestedPartitions <= 0) { val topicData = readMessageSets(fetchRequest) debug("Returning fetch response %s for fetch request with correlation id %d" .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) Index: core/src/main/scala/kafka/api/FetchRequest.scala =================================================================== --- core/src/main/scala/kafka/api/FetchRequest.scala (revision 1366075) +++ core/src/main/scala/kafka/api/FetchRequest.scala (working copy) @@ -142,6 +142,8 @@ } def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) + + def nRequestedPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) }