diff --git core/src/main/scala/kafka/network/RequestChannel.scala core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..6377ad4 100644 --- core/src/main/scala/kafka/network/RequestChannel.scala +++ core/src/main/scala/kafka/network/RequestChannel.scala @@ -89,23 +89,27 @@ object RequestChannel extends Logging { } } -class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { +class RequestChannel(val numProcessors: Int, val numIoThreads: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil - private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) + private val requestQueues = new Array[BlockingQueue[RequestChannel.Request]](numIoThreads) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) - for(i <- 0 until numProcessors) + for(i <- 0 until numProcessors) { responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() + } + for(i <- 0 until numIoThreads) { + requestQueues(i) = new ArrayBlockingQueue[RequestChannel.Request](queueSize) + } newGauge( "RequestQueueSize", new Gauge[Int] { - def getValue = requestQueue.size + def getValue = requestQueues.foldLeft(0)((sum, queue) => sum + queue.size) } ) /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ - def sendRequest(request: RequestChannel.Request) { - requestQueue.put(request) + def sendRequest(request: RequestChannel.Request, ioThread: Int) { + requestQueues(ioThread).put(request) } /** Send a response back to the socket server to be sent over the network */ @@ -116,8 +120,8 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = - requestQueue.take() + def receiveRequest(ioThread: Int): RequestChannel.Request = + requestQueues(ioThread).take() /** Get a response for the given processor if there is one */ def receiveResponse(processor: Int): RequestChannel.Response = diff --git core/src/main/scala/kafka/network/SocketServer.scala core/src/main/scala/kafka/network/SocketServer.scala index b056e25..5ea94d1 100644 --- core/src/main/scala/kafka/network/SocketServer.scala +++ core/src/main/scala/kafka/network/SocketServer.scala @@ -35,21 +35,22 @@ import kafka.utils._ class SocketServer(val brokerId: Int, val host: String, val port: Int, - val numProcessorThreads: Int, + val numProcessorThreads: Int, + val numIoThreads: Int, val maxQueuedRequests: Int, val maxRequestSize: Int = Int.MaxValue) extends Logging { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @volatile private var acceptor: Acceptor = null - val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) + val requestChannel = new RequestChannel(numProcessorThreads, numIoThreads, maxQueuedRequests) /** * Start the socket server */ def startup() { for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, time, maxRequestSize, requestChannel) + processors(i) = new Processor(i, numIoThreads, time, maxRequestSize, requestChannel) Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses @@ -205,6 +206,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce * each of which has its own selectors */ private[kafka] class Processor(val id: Int, + val numIoThreads: Int, val time: Time, val maxRequestSize: Int, val requestChannel: RequestChannel) extends AbstractServerThread { @@ -319,11 +321,15 @@ private[kafka] class Processor(val id: Int, close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) - requestChannel.sendRequest(req) - trace("Received request, sending for processing by handler: " + req) + // send the request to the io thread handling all the requests coming from this client + // this mapping from key to io thread can be cached here. The mapping will be added when the processor accepts a connection + // and will be removed when the processor closes the key + requestChannel.sendRequest(req, key.hashCode % numIoThreads) key.attach(null) + trace("Selection key's interest ops after completing a read are " + (key.readyOps & key.interestOps & SelectionKey.OP_READ)) } else { // more reading to be done + trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } @@ -343,8 +349,10 @@ private[kafka] class Processor(val id: Int, if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) + trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { + trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } diff --git core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index b209a97..a0e2b44 100644 --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError){ debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode)) diff --git core/src/main/scala/kafka/producer/ConsoleProducer.scala core/src/main/scala/kafka/producer/ConsoleProducer.scala index 8b77465..eebfda6 100644 --- core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -87,7 +87,12 @@ object ConsoleProducer { .describedAs("reader_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[LineMessageReader].getName) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg .describedAs("prop") @@ -116,6 +121,7 @@ object ConsoleProducer { val keyEncoderClass = options.valueOf(keyEncoderOpt) val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt)) cmdLineProps.put("topic", topic) @@ -133,7 +139,7 @@ object ConsoleProducer { props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) - + props.put("send.buffer.bytes", socketBuffer.toString) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) diff --git core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index af077e0..3d22e6d 100644 --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionCodec:String = null var enqueueTimeout:String = null var queueSize:String = null + var requiredNumAcks: Int = Int.MaxValue private var producer: Producer[String, String] = null @@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } - + def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } def getProducerType:String = producerType def setProducerType(producerType:String) { this.producerType = producerType } - + def getCompressionCodec:String = compressionCodec def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } - + def getEnqueueTimeout:String = enqueueTimeout def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } def getQueueSize:String = queueSize def setQueueSize(queueSize:String) { this.queueSize = queueSize } + def getRequiredNumAcks:Int = requiredNumAcks + def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { if(compressionCodec != null) props.put("compression.codec", compressionCodec) if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) + if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) LogLog.debug("Logging for topic: " + topic) } - + override def append(event: LoggingEvent) { val message : String = if( this.layout == null) { event.getRenderedMessage diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala index 0469a39..244b2c4 100644 --- core/src/main/scala/kafka/producer/SyncProducer.scala +++ core/src/main/scala/kafka/producer/SyncProducer.scala @@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse): Receive = { + private def doSend(request: RequestOrResponse, requiredAcks: Int = 1): Receive = { lock synchronized { verifyRequest(request) getOrMakeConnection() @@ -70,7 +70,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Receive = null try { blockingChannel.send(request) - response = blockingChannel.receive() + // read response only if request.required.acks != 0 + if(requiredAcks != 0) + response = blockingChannel.receive() + else + trace("Skipping reading response") } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry @@ -85,7 +89,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Send a message */ - def send(producerRequest: ProducerRequest): ProducerResponse = { + def send(producerRequest: ProducerRequest): Option[ProducerResponse] = { val requestSize = producerRequest.sizeInBytes producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) @@ -95,14 +99,21 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { - response = doSend(producerRequest) + response = doSend(producerRequest, producerRequest.requiredAcks) } } - ProducerResponse.readFrom(response.buffer) + if(config.requestRequiredAcks != 0) + Some(ProducerResponse.readFrom(response.buffer)) + else + None } def send(request: TopicMetadataRequest): TopicMetadataResponse = { - val response = doSend(request) + // force the num acks for topic metadata request to be 1. Any other value for topic metadata request is invalid since + // it is not a typical produce request. 0 is invalid since the broker has to send a response. -1 is invalid since + // only one broker processes a topic metadata request by reading from zookeeper and it has nothing to do with the replication + // protocol + val response = doSend(request, 1) TopicMetadataResponse.readFrom(response.buffer) } diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 374cd6b..9eea8b9 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -240,23 +240,26 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - val response = syncProducer.send(producerRequest) + val responseOpt = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - if (response.status.size != producerRequest.data.size) - throw new KafkaException("Incomplete response (%s) for producer request (%s)" - .format(response, producerRequest)) - if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) - successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + responseOpt match { + case Some(response) => + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) + if (logger.isTraceEnabled) { + val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) + successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => + trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + } + failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) + if(failedTopicPartitions.size > 0) + error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" + .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) + failedTopicPartitions + case None => Seq.empty[TopicAndPartition] } - failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq - .map(partitionStatus => partitionStatus._1) - if(failedTopicPartitions.size > 0) - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" - .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) - failedTopicPartitions } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data %s" diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index 6df077b..ffa800c 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -127,8 +127,10 @@ class KafkaApis(val requestChannel: RequestChannel, val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) - if (produceRequest.requiredAcks == 0 || - produceRequest.requiredAcks == 1 || + if(produceRequest.requiredAcks == 0) { + // do not send a response if the producer request.required.acks = 0. This mimics the behavior of a 0.7 producer + // and is tuned for very high throughput + } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { diff --git core/src/main/scala/kafka/server/KafkaRequestHandler.scala core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f0c05a5..2d89f94 100644 --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -31,8 +31,8 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha def run() { while(true) { try { - val req = requestChannel.receiveRequest() - if(req eq RequestChannel.AllDone){ + val req = requestChannel.receiveRequest(id) + if(req eq RequestChannel.AllDone) { trace("receives shut down command, shut down".format(brokerId, id)) return } @@ -45,7 +45,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha } } - def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone) + def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone, id) } class KafkaRequestHandlerPool(val brokerId: Int, diff --git core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/server/KafkaServer.scala index 1fe1ca9..69527f5 100644 --- core/src/main/scala/kafka/server/KafkaServer.scala +++ core/src/main/scala/kafka/server/KafkaServer.scala @@ -65,6 +65,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.hostName, config.port, config.numNetworkThreads, + config.numIoThreads, config.queuedMaxRequests, config.socketRequestMaxBytes) diff --git core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 402fced..c3aad7d 100644 --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -86,6 +86,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -115,6 +116,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("compression", "true") + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 6db63ba..7933450 100644 --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -125,7 +125,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request) // create the kafka request handler - val requestChannel = new RequestChannel(2, 5) + val requestChannel = new RequestChannel(2, 2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) // call the API (to be tested) to get metadata diff --git core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index c25255f..67497dd 100644 --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var logDirZk: File = null var config: KafkaConfig = null - var serverZk: KafkaServer = null + var server: KafkaServer = null var simpleConsumerZk: SimpleConsumer = null @@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) - serverZk = TestUtils.createServer(config); + server = TestUtils.createServer(config); simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @After override def tearDown() { simpleConsumerZk.close - serverZk.shutdown + server.shutdown Utils.rm(logDirZk) super.tearDown() } @@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") props } } diff --git core/src/test/scala/unit/kafka/network/SocketServerTest.scala core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7395cbc..fa67420 100644 --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -37,6 +37,7 @@ class SocketServerTest extends JUnitSuite { host = null, port = TestUtils.choosePort, numProcessorThreads = 1, + numIoThreads = 1, maxQueuedRequests = 50, maxRequestSize = 50) server.startup() @@ -59,7 +60,7 @@ class SocketServerTest extends JUnitSuite { /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { - val request = channel.receiveRequest + val request = channel.receiveRequest(0) val id = request.buffer.getShort val send = new BoundedByteBufferSend(request.buffer.slice) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) diff --git core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index fb0666f..f4611e1 100644 --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) @@ -410,18 +411,18 @@ class AsyncProducerTest extends JUnit3Suite { // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11) - val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException - EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1) - EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2) + EasyMock.expect(mockSyncProducer.send(request2)).andReturn(Some(response1)) + EasyMock.expect(mockSyncProducer.send(request3)).andReturn(Some(response2)) EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) diff --git core/src/test/scala/unit/kafka/producer/ProducerTest.scala core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 792919b..04acef5 100644 --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") -// props.put("request.required.acks", "-1") + props.put("request.required.acks", "1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic @@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) diff --git core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 89ba944..a460e8f 100644 --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -44,10 +44,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "500") props.put("reconnect.interval", "1000") + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -56,7 +58,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -64,7 +67,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val secondEnd = SystemTime.milliseconds Assert.assertTrue((secondEnd-secondStart) < 500) try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)).get Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -80,14 +84,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") + props.put("request.required.acks", "1") val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val ack: Short = 1 val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) - val producer = new SyncProducer(new SyncProducerConfig(props)) - val response = producer.send(emptyRequest) + val response = producer.send(emptyRequest).get Assert.assertTrue(!response.hasError && response.status.size == 0) } @@ -97,13 +101,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) - val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) + val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1)).get Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) @@ -112,7 +117,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) - val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) + val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1)).get Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error) @@ -128,13 +133,14 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("send.buffer.bytes", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) // #1 - test that we get an error when partition does not belong to broker in response val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) - val response = producer.send(request) + val response = producer.send(request).get Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) @@ -151,7 +157,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500) - val response2 = producer.send(request) + val response2 = producer.send(request).get Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(3, response2.status.size) @@ -178,10 +184,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("port", server.socketServer.port.toString) props.put("send.buffer.bytes", "102400") props.put("request.timeout.ms", String.valueOf(timeoutMs)) + props.put("request.required.acks", "1") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - val request = TestUtils.produceRequest("topic1", 0, messages) + val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled @@ -196,7 +203,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { case e => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds - + println("Timeout = " + (t2-t1)) // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) } diff --git core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1557047..f6e16ae 100644 --- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -87,7 +87,7 @@ class SimpleFetchTest extends JUnit3Suite { // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) // don't provide replica or leader callbacks since they will not be tested here - val requestChannel = new RequestChannel(2, 5) + val requestChannel = new RequestChannel(2, 2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log @@ -181,7 +181,7 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() EasyMock.replay(replicaManager) - val requestChannel = new RequestChannel(2, 5) + val requestChannel = new RequestChannel(2, 2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) /**