diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..df5081b 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -40,6 +40,8 @@ object RequestChannel extends Logging { byteBuffer } + def getFakeProducerResponse(request: Request) = new Response(0, request, null) + case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { @volatile var dequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @@ -116,7 +118,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = + def receiveRequest(): RequestChannel.Request = requestQueue.take() /** Get a response for the given processor if there is one */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b056e25..f597efa 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -35,7 +35,7 @@ import kafka.utils._ class SocketServer(val brokerId: Int, val host: String, val port: Int, - val numProcessorThreads: Int, + val numProcessorThreads: Int, val maxQueuedRequests: Int, val maxRequestSize: Int = Int.MaxValue) extends Logging { this.logIdent = "[Socket Server on Broker " + brokerId + "], " @@ -205,7 +205,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce * each of which has its own selectors */ private[kafka] class Processor(val id: Int, - val time: Time, + val time: Time, val maxRequestSize: Int, val requestChannel: RequestChannel) extends AbstractServerThread { @@ -261,8 +261,17 @@ private[kafka] class Processor(val id: Int, trace("Socket server received response to send, registering for write: " + curr) val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + if(curr.responseSend == null) { + // this means the response is fake and it is for a produce request that had required.request.acks set to 0. + // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests + // that are sitting in the socket buffer + key.interestOps(SelectionKey.OP_READ) + key.attach(null) + } + else { + key.interestOps(SelectionKey.OP_WRITE) + key.attach(curr) + } } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") @@ -320,10 +329,12 @@ private[kafka] class Processor(val id: Int, } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) - trace("Received request, sending for processing by handler: " + req) key.attach(null) + // explicitly reset interest ops to not include READ or WRITE + key.interestOps(~(SelectionKey.OP_READ & SelectionKey.OP_WRITE)) } else { // more reading to be done + trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } @@ -343,8 +354,10 @@ private[kafka] class Processor(val id: Int, if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) + trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { + trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index b209a97..a0e2b44 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError){ debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode)) diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 8b77465..eebfda6 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -87,7 +87,12 @@ object ConsoleProducer { .describedAs("reader_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[LineMessageReader].getName) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg .describedAs("prop") @@ -116,6 +121,7 @@ object ConsoleProducer { val keyEncoderClass = options.valueOf(keyEncoderOpt) val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt)) cmdLineProps.put("topic", topic) @@ -133,7 +139,7 @@ object ConsoleProducer { props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) - + props.put("send.buffer.bytes", socketBuffer.toString) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index af077e0..3d22e6d 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionCodec:String = null var enqueueTimeout:String = null var queueSize:String = null + var requiredNumAcks: Int = Int.MaxValue private var producer: Producer[String, String] = null @@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } - + def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } def getProducerType:String = producerType def setProducerType(producerType:String) { this.producerType = producerType } - + def getCompressionCodec:String = compressionCodec def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } - + def getEnqueueTimeout:String = enqueueTimeout def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } def getQueueSize:String = queueSize def setQueueSize(queueSize:String) { this.queueSize = queueSize } + def getRequiredNumAcks:Int = requiredNumAcks + def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { if(compressionCodec != null) props.put("compression.codec", compressionCodec) if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) + if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) LogLog.debug("Logging for topic: " + topic) } - + override def append(event: LoggingEvent) { val message : String = if( this.layout == null) { event.getRenderedMessage diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0469a39..244b2c4 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse): Receive = { + private def doSend(request: RequestOrResponse, requiredAcks: Int = 1): Receive = { lock synchronized { verifyRequest(request) getOrMakeConnection() @@ -70,7 +70,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Receive = null try { blockingChannel.send(request) - response = blockingChannel.receive() + // read response only if request.required.acks != 0 + if(requiredAcks != 0) + response = blockingChannel.receive() + else + trace("Skipping reading response") } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry @@ -85,7 +89,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Send a message */ - def send(producerRequest: ProducerRequest): ProducerResponse = { + def send(producerRequest: ProducerRequest): Option[ProducerResponse] = { val requestSize = producerRequest.sizeInBytes producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) @@ -95,14 +99,21 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { - response = doSend(producerRequest) + response = doSend(producerRequest, producerRequest.requiredAcks) } } - ProducerResponse.readFrom(response.buffer) + if(config.requestRequiredAcks != 0) + Some(ProducerResponse.readFrom(response.buffer)) + else + None } def send(request: TopicMetadataRequest): TopicMetadataResponse = { - val response = doSend(request) + // force the num acks for topic metadata request to be 1. Any other value for topic metadata request is invalid since + // it is not a typical produce request. 0 is invalid since the broker has to send a response. -1 is invalid since + // only one broker processes a topic metadata request by reading from zookeeper and it has nothing to do with the replication + // protocol + val response = doSend(request, 1) TopicMetadataResponse.readFrom(response.buffer) } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 374cd6b..9eea8b9 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -240,23 +240,26 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - val response = syncProducer.send(producerRequest) + val responseOpt = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - if (response.status.size != producerRequest.data.size) - throw new KafkaException("Incomplete response (%s) for producer request (%s)" - .format(response, producerRequest)) - if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) - successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + responseOpt match { + case Some(response) => + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) + if (logger.isTraceEnabled) { + val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) + successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => + trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + } + failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) + if(failedTopicPartitions.size > 0) + error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" + .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) + failedTopicPartitions + case None => Seq.empty[TopicAndPartition] } - failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq - .map(partitionStatus => partitionStatus._1) - if(failedTopicPartitions.size > 0) - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" - .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) - failedTopicPartitions } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data %s" diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6df077b..5d11c3c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -127,8 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel, val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) - if (produceRequest.requiredAcks == 0 || - produceRequest.requiredAcks == 1 || + if(produceRequest.requiredAcks == 0) { + // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer + // and is tuned for very high throughput + requestChannel.sendResponse(RequestChannel.getFakeProducerResponse(request)) + } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 402fced..3a2e4e0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -86,6 +86,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -115,6 +116,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("compression", "true") + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -272,7 +274,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } producer.send(produceList: _*) - // wait a bit for produced message to be available val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index c25255f..67497dd 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var logDirZk: File = null var config: KafkaConfig = null - var serverZk: KafkaServer = null + var server: KafkaServer = null var simpleConsumerZk: SimpleConsumer = null @@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) - serverZk = TestUtils.createServer(config); + server = TestUtils.createServer(config); simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @After override def tearDown() { simpleConsumerZk.close - serverZk.shutdown + server.shutdown Utils.rm(logDirZk) super.tearDown() } @@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") props } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index fb0666f..f4611e1 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) @@ -410,18 +411,18 @@ class AsyncProducerTest extends JUnit3Suite { // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11) - val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException - EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1) - EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2) + EasyMock.expect(mockSyncProducer.send(request2)).andReturn(Some(response1)) + EasyMock.expect(mockSyncProducer.send(request3)).andReturn(Some(response2)) EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 792919b..04acef5 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") -// props.put("request.required.acks", "-1") + props.put("request.required.acks", "1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic @@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 89ba944..a460e8f 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -44,10 +44,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "500") props.put("reconnect.interval", "1000") + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -56,7 +58,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -64,7 +67,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val secondEnd = SystemTime.milliseconds Assert.assertTrue((secondEnd-secondStart) < 500) try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -80,14 +84,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") + props.put("request.required.acks", "1") val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val ack: Short = 1 val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) - val producer = new SyncProducer(new SyncProducerConfig(props)) - val response = producer.send(emptyRequest) + val response = producer.send(emptyRequest).get Assert.assertTrue(!response.hasError && response.status.size == 0) } @@ -97,13 +101,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) - val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) + val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1)).get Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) @@ -112,7 +117,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) - val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) + val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1)).get Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error) @@ -128,13 +133,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) // #1 - test that we get an error when partition does not belong to broker in response val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) - val response = producer.send(request) + val response = producer.send(request).get Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) @@ -151,7 +157,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500) - val response2 = producer.send(request) + val response2 = producer.send(request).get Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(3, response2.status.size) @@ -178,10 +184,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("port", server.socketServer.port.toString) props.put("send.buffer.bytes", "102400") props.put("request.timeout.ms", String.valueOf(timeoutMs)) + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - val request = TestUtils.produceRequest("topic1", 0, messages) + val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled @@ -196,7 +203,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { case e => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds - + println("Timeout = " + (t2-t1)) // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) }