diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 39b0185..108a958 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -17,91 +17,89 @@ package kafka.consumer -import java.net._ -import java.nio.channels._ import kafka.api._ import kafka.network._ import kafka.utils._ -import kafka.utils.Utils._ /** * A consumer of kafka messages */ @threadsafe -class SimpleConsumer(val host: String, - val port: Int, - val soTimeout: Int, - val bufferSize: Int) extends Logging { - private var channel : SocketChannel = null +class SimpleConsumer( val host: String, + val port: Int, + val soTimeout: Int, + val bufferSize: Int ) extends Logging { + private val lock = new Object() + private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout) - private def connect(): SocketChannel = { + private def connect(): BlockingChannel = { close - val address = new InetSocketAddress(host, port) - - val channel = SocketChannel.open - debug("Connected to " + address + " for fetching.") - channel.configureBlocking(true) - channel.socket.setReceiveBufferSize(bufferSize) - channel.socket.setSoTimeout(soTimeout) - channel.socket.setKeepAlive(true) - channel.connect(address) - trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize) - trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout) - - channel + blockingChannel.connect() + blockingChannel } - private def close(channel: SocketChannel) = { - debug("Disconnecting from " + channel.socket.getRemoteSocketAddress()) - swallow(channel.close()) - swallow(channel.socket.close()) + private def disconnect() = { + if(blockingChannel.isConnected) { + debug("Disconnecting from " + host + ":" + port) + blockingChannel.disconnect() + } + } + + private def reconnect() { + disconnect() + connect() } def close() { lock synchronized { - if (channel != null) - close(channel) - channel = null + disconnect() } } - - /** - * Fetch a set of messages from a topic. - * - * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. - * @return a set of fetched messages - */ - def fetch(request: FetchRequest): FetchResponse = { + + private def sendRequest(request: Request): Tuple2[Receive, Int] = { lock synchronized { - val startTime = SystemTime.nanoseconds getOrMakeConnection() var response: Tuple2[Receive,Int] = null try { - sendRequest(request, channel) - response = getResponse(channel) + blockingChannel.send(request) + response = blockingChannel.receive() } catch { case e : java.io.IOException => - info("Reconnect in fetch request due to socket error: ", e) + info("Reconnect in due to socket error: ", e) // retry once try { - channel = connect - sendRequest(request, channel) - response = getResponse(channel) + reconnect() + blockingChannel.send(request) + response = blockingChannel.receive() } catch { - case ioe: java.io.IOException => channel = null; throw ioe; + case ioe: java.io.IOException => + disconnect() + throw ioe } case e => throw e } - val fetchResponse = FetchResponse.readFrom(response._1.buffer) - val fetchedSize = fetchResponse.sizeInBytes + response + } + } - val endTime = SystemTime.nanoseconds - SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) + /** + * Fetch a set of messages from a topic. + * + * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. + * @return a set of fetched messages + */ + def fetch(request: FetchRequest): FetchResponse = { + val startTime = SystemTime.nanoseconds + val response = sendRequest(request) + val fetchResponse = FetchResponse.readFrom(response._1.buffer) + val fetchedSize = fetchResponse.sizeInBytes - fetchResponse - } + val endTime = SystemTime.nanoseconds + SimpleConsumerStats.recordFetchRequest(endTime - startTime) + SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) + + fetchResponse } /** @@ -112,31 +110,14 @@ class SimpleConsumer(val host: String, * @return an array of offsets */ def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = { - lock synchronized { - getOrMakeConnection() - var response: Tuple2[Receive,Int] = null - try { - sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel) - response = getResponse(channel) - } catch { - case e : java.io.IOException => - info("Reconnect in get offetset request due to socket error: ", e) - // retry once - try { - channel = connect - sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel) - response = getResponse(channel) - } catch { - case ioe: java.io.IOException => channel = null; throw ioe; - } - } - OffsetRequest.deserializeOffsetArray(response._1.buffer) - } + val request = new OffsetRequest(topic, partition, time, maxNumOffsets) + val response = sendRequest(request) + OffsetRequest.deserializeOffsetArray(response._1.buffer) } private def getOrMakeConnection() { - if(channel == null) { - channel = connect() + if(!blockingChannel.isConnected) { + connect() } } } diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala new file mode 100644 index 0000000..addb625 --- /dev/null +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.net.InetSocketAddress +import java.nio.channels._ +import kafka.utils.{nonthreadsafe, Logging} + +/** + * A simple blocking channel with timeouts correctly enabled. + * + */ +@nonthreadsafe +class BlockingChannel( val host: String, + val port: Int, + val readBufferSize: Int, + val writeBufferSize: Int, + val readTimeoutMs: Int ) extends Logging { + + private var connected = false + private var channel: SocketChannel = null + private var readChannel: ReadableByteChannel = null + private var writeChannel: GatheringByteChannel = null + private val lock = new Object() + + def connect() = lock synchronized { + if(!connected) { + channel = SocketChannel.open() + if(readBufferSize > 0) + channel.socket.setReceiveBufferSize(readBufferSize) + if(writeBufferSize > 0) + channel.socket.setSendBufferSize(writeBufferSize) + channel.configureBlocking(true) + channel.socket.setSoTimeout(readTimeoutMs) + channel.socket.setKeepAlive(true) + channel.connect(new InetSocketAddress(host, port)) + + writeChannel = channel + readChannel = Channels.newChannel(channel.socket().getInputStream) + connected = true + } + } + + def disconnect() = lock synchronized { + if(connected || channel != null) { + // closing the main socket channel *should* close the read channel + // but let's do it to be sure. + swallow(channel.close()) + swallow(channel.socket.close()) + swallow(readChannel.close()) + channel = null; readChannel = null; writeChannel = null + connected = false + } + } + + def isConnected = connected + + def send(request: Request):Int = { + if(!connected) + throw new ClosedChannelException() + + val send = new BoundedByteBufferSend(request) + send.writeCompletely(writeChannel) + } + + def receive(): Tuple2[Receive, Int] = { + if(!connected) + throw new ClosedChannelException() + + val response = new BoundedByteBufferReceive() + response.readCompletely(readChannel) + + // this has the side effect of setting the initial position of buffer correctly + val errorCode: Int = response.buffer.getShort + (response, errorCode) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 457a983..cf461e5 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -50,12 +50,13 @@ trait Receive extends Transmission { def readFrom(channel: ReadableByteChannel): Int def readCompletely(channel: ReadableByteChannel): Int = { - var read = 0 + var totalRead = 0 while(!complete) { - read = readFrom(channel) + val read = readFrom(channel) trace(read + " bytes read.") + totalRead += read } - read + totalRead } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index ba4eaae..a68fc63 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,14 +17,11 @@ package kafka.producer -import java.net.InetSocketAddress -import java.nio.channels.SocketChannel import kafka.api._ import kafka.common.MessageSizeTooLargeException import kafka.message.MessageSet -import kafka.network.{BoundedByteBufferSend, Request, Receive} +import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive} import kafka.utils._ -import kafka.utils.Utils._ /* * Send a message set. @@ -33,11 +30,10 @@ import kafka.utils.Utils._ class SyncProducer(val config: SyncProducerConfig) extends Logging { private val MaxConnectBackoffMs = 60000 - private var channel : SocketChannel = null private var sentOnConnection = 0 private val lock = new Object() - @volatile - private var shutdown: Boolean = false + @volatile private var shutdown: Boolean = false + private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs) debug("Instantiating Scala Sync Producer") @@ -64,21 +60,19 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Tuple2[Receive, Int] = null try { - sendRequest(request, channel) - response = getResponse(channel) + blockingChannel.send(request) + response = blockingChannel.receive() } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect() - println("sdfsdfsdf") throw e - case e => println("other sdfsdfsdfs"); throw e + case e => throw e } // TODO: do we still need this? sentOnConnection += 1 if(sentOnConnection >= config.reconnectInterval) { - disconnect() - channel = connect() + reconnect() sentOnConnection = 0 } SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime) @@ -119,41 +113,38 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { throw new MessageSizeTooLargeException } + private def reconnect() { + disconnect() + connect() + } + /** * Disconnect from current channel, closing connection. * Side effect: channel field is set to null on successful disconnect */ private def disconnect() { try { - if(channel != null) { + if(blockingChannel.isConnected) { info("Disconnecting from " + config.host + ":" + config.port) - swallow(channel.close()) - swallow(channel.socket.close()) - channel = null + blockingChannel.disconnect() } } catch { case e: Exception => error("Error on disconnect: ", e) } } - private def connect(): SocketChannel = { + private def connect(): BlockingChannel = { var connectBackoffMs = 1 val beginTimeMs = SystemTime.milliseconds - while(channel == null && !shutdown) { + while(!blockingChannel.isConnected && !shutdown) { try { - channel = SocketChannel.open() - channel.socket.setSendBufferSize(config.bufferSize) - channel.configureBlocking(true) - channel.socket.setSoTimeout(config.socketTimeoutMs) - channel.socket.setKeepAlive(true) - channel.connect(new InetSocketAddress(config.host, config.port)) + blockingChannel.connect() info("Connected to " + config.host + ":" + config.port + " for producing") - } - catch { + } catch { case e: Exception => { disconnect() val endTimeMs = SystemTime.milliseconds - if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs) { + if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs ) { error("Producer connection to " + config.host + ":" + config.port + " timing out after " + config.connectTimeoutMs + " ms", e) throw e } @@ -163,12 +154,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } } - channel + blockingChannel } private def getOrMakeConnection() { - if(channel == null) { - channel = connect() + if(!blockingChannel.isConnected) { + connect() } } } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 0a7e973..d9879cf 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -24,8 +24,8 @@ import kafka.producer._ import kafka.serializer.Encoder import scala.collection.Map import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException} import kafka.utils.{Utils, Logging} +import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException} class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing private val partitioner: Partitioner[K], // use the other constructor @@ -42,7 +42,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, def handle(events: Seq[ProducerData[K,V]]) { lock synchronized { - val serializedData = serialize(events) + val serializedData = serialize(events) var outstandingProduceRequests = serializedData var remainingRetries = config.producerRetries Stream.continually(dispatchSerializedData(outstandingProduceRequests)) @@ -51,9 +51,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig, outstandingProduceRequests = currentOutstandingRequests // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.producerRetryBackoffMs) - brokerPartitionInfo.updateInfo() + Utils.swallowError(brokerPartitionInfo.updateInfo()) remainingRetries -= 1 } + if(remainingRetries < 0 && outstandingProduceRequests.size > 0) { + throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) + } } } @@ -70,7 +73,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if((brokerid < 0) || (!send(brokerid, messageSetPerBroker))) failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten) } - }catch { + } catch { case t: Throwable => error("Failed to send messages") } failedProduceRequests diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a39a01e..c0b6d6e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -701,20 +701,6 @@ object Utils extends Logging { case _ => // swallow } } - - def sendRequest(request: Request, channel: SocketChannel) = { - val send = new BoundedByteBufferSend(request) - send.writeCompletely(channel) - } - - def getResponse(channel: SocketChannel): Tuple2[Receive,Int] = { - val response = new BoundedByteBufferReceive() - response.readCompletely(channel) - - // this has the side effect of setting the initial position of buffer correctly - val errorCode: Int = response.buffer.getShort - (response, errorCode) - } } class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 13e900a..72aa445 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -18,18 +18,19 @@ package kafka.producer import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.consumer.SimpleConsumer -import org.I0Itec.zkclient.ZkClient -import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import java.util.Properties -import org.apache.log4j.{Level, Logger} -import org.junit.Test -import kafka.utils.{TestZKUtils, Utils, TestUtils} -import kafka.message.Message import kafka.admin.CreateTopicCommand import kafka.api.FetchRequestBuilder +import kafka.common.FailedToSendMessageException +import kafka.consumer.SimpleConsumer +import kafka.message.Message +import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} +import kafka.zk.ZooKeeperTestHarness +import org.apache.log4j.{Level, Logger} +import org.I0Itec.zkclient.ZkClient import org.junit.Assert._ +import org.junit.Test +import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils} class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { private val brokerId1 = 0 @@ -77,7 +78,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { // restore set request handler logger to a higher level requestHandlerLogger.setLevel(Level.ERROR) server1.shutdown + server1.awaitShutdown() server2.shutdown + server2.awaitShutdown() Utils.rm(server1.config.logDir) Utils.rm(server2.config.logDir) Thread.sleep(500) @@ -120,107 +123,63 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { } } catch { case e: Exception => fail("Not expected", e) + } finally { + producer.close } - producer.close } -// @Test -// def testZKSendWithDeadBroker() { -// val props = new Properties() -// props.put("serializer.class", "kafka.serializer.StringEncoder") -// props.put("partitioner.class", "kafka.utils.StaticPartitioner") -// props.put("zk.connect", TestZKUtils.zookeeperConnect) -// -// // create topic -// CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0") -// -// val config = new ProducerConfig(props) -// -// val producer = new Producer[String, String](config) -// val message = new Message("test1".getBytes) -// try { -//// // kill 2nd broker -//// server1.shutdown -//// Thread.sleep(100) -// -// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and -// // all partitions have broker 0 as the leader. -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// Thread.sleep(100) -// -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// Thread.sleep(3000) -// -// // restart server 1 -//// server1.startup() -//// Thread.sleep(100) -// -// // cross check if brokers got the messages -// val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) -// val messageSet = response.messageSet("new-topic", 0).iterator -// var numMessagesReceived = 0 -// while(messageSet.hasNext) { -// val messageAndOffset = messageSet.next() -// assertEquals(message, messageSet.next.message) -// println("Received message at offset %d".format(messageAndOffset.offset)) -// numMessagesReceived += 1 -// } -// assertEquals("Message set should have 2 messages", 2, numMessagesReceived) -// } catch { -// case e: Exception => fail("Not expected", e) -// } -// producer.close -// } - - // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions - // and when leader logic is changed. -// @Test -// def testZKSendWithDeadBroker2() { -// val props = new Properties() -// props.put("serializer.class", "kafka.serializer.StringEncoder") -// props.put("partitioner.class", "kafka.utils.StaticPartitioner") -// props.put("socket.timeout.ms", "200") -// props.put("zk.connect", TestZKUtils.zookeeperConnect) -// -// // create topic -// CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") -// -// val config = new ProducerConfig(props) -// -// val producer = new Producer[String, String](config) -// try { -// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and -// // all partitions have broker 0 as the leader. -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// Thread.sleep(100) -// // kill 2nd broker -// server1.shutdown -// Thread.sleep(500) -// -// // Since all partitions are unavailable, this request will be dropped -// try { -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// fail("Leader broker for \"new-topic\" isn't up, should not be able to send data") -// } catch { -// case e: kafka.common.FailedToSendMessageException => // success -// case e => fail("Leader broker for \"new-topic\" isn't up, should not be able to send data") -// } -// -// // restart server 1 -// server1.startup() -// Thread.sleep(200) -// -// // cross check if brokers got the messages -// val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) -// val messageSet1 = response1.messageSet("new-topic", 0).iterator -// assertTrue("Message set should have 1 message", messageSet1.hasNext) -// assertEquals(new Message("test1".getBytes), messageSet1.next.message) -// assertFalse("Message set should not have more than 1 message", messageSet1.hasNext) -// } catch { -// case e: Exception => fail("Not expected", e) -// } -// producer.close -// } + @Test + def testZKSendWithDeadBroker() { + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("partitioner.class", "kafka.utils.StaticPartitioner") + props.put("socket.timeout.ms", "2000") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + + // create topic + CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + try { + // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only + // on broker 0 + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + Thread.sleep(100) + } catch { + case e => fail("Unexpected exception: " + e) + } + + // kill the broker + server1.shutdown + server1.awaitShutdown() + Thread.sleep(100) + + try { + // These sends should fail since there are no available brokers + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + Thread.sleep(100) + fail("Should fail since no leader exists for the partition.") + } catch { + case e => // success + } + + // restart server 1 + server1.startup() + Thread.sleep(500) + + try { + // cross check if broker 1 got the messages + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0).iterator + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.next.message) + assertFalse("Message set should have another message", messageSet1.hasNext) + } catch { + case e: Exception => fail("Not expected", e) + } + producer.close + } @Test def testZKSendToExistingTopicWithNoBrokers() { @@ -250,6 +209,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { // shutdown server2 server2.shutdown + server2.awaitShutdown() Thread.sleep(100) // delete the new-topic logs Utils.rm(server2.config.logDir) @@ -279,5 +239,56 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close } } + + @Test + def testAsyncSendCanCorrectlyFailWithTimeout() { + val timeoutMs = 500 + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("partitioner.class", "kafka.utils.StaticPartitioner") + props.put("socket.timeout.ms", String.valueOf(timeoutMs)) + props.put("zk.connect", TestZKUtils.zookeeperConnect) + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // create topics in ZK + CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") + + // do a simple test to make sure plumbing is okay + try { + // this message should be assigned to partition 0 whose leader is on broker 0 + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test"))) + Thread.sleep(100) + // cross check if brokers got the messages + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0).iterator + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test".getBytes), messageSet1.next.message) + } catch { + case e => case e: Exception => producer.close; fail("Not expected", e) + } + + // stop IO threads and request handling, but leave networking operational + // any requests should be accepted and queue up, but not handled + server1.requestHandlerPool.shutdown() + + val t1 = SystemTime.milliseconds + try { + // this message should be assigned to partition 0 whose leader is on broker 0, but + // broker 0 will not response within timeoutMs millis. + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test"))) + } catch { + case e: FailedToSendMessageException => /* success */ + case e: Exception => fail("Not expected", e) + } finally { + producer.close + } + val t2 = SystemTime.milliseconds + + // make sure we don't wait fewer than numRetries*timeoutMs milliseconds + // we do this because the DefaultEventHandler retries a number of times + assertTrue((t2-t1) >= timeoutMs*config.producerRetries) + } } diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 083bcaf..fca0ede 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -27,6 +27,7 @@ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite +import java.net.SocketTimeoutException class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); @@ -90,7 +91,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { } @Test - def testProduceBlocksWhenRequired() { + def testProduceCorrectlyReceivesResponse() { // TODO: this will need to change with kafka-44 val server = servers.head val props = new Properties() @@ -134,4 +135,37 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(-1, response2.offsets(1)) } + + @Test + def testProducerCanTimeout() { + val timeoutMs = 500 + + val server = servers.head + val props = new Properties() + props.put("host", "localhost") + props.put("port", server.socketServer.port.toString) + props.put("buffer.size", "102400") + props.put("socket.timeout.ms", String.valueOf(timeoutMs)) + val producer = new SyncProducer(new SyncProducerConfig(props)) + + val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) + val request = TestUtils.produceRequest("topic1", 0, messages) + + // stop IO threads and request handling, but leave networking operational + // any requests should be accepted and queue up, but not handled + server.requestHandlerPool.shutdown() + + val t1 = SystemTime.milliseconds + try { + val response2 = producer.send(request) + Assert.fail("Should have received timeout exception since request handling is stopped.") + } catch { + case e: SocketTimeoutException => /* success */ + case e => Assert.fail("Unexpected exception when expecting timeout: " + e) + } + val t2 = SystemTime.milliseconds + + // make sure we don't wait fewer than timeoutMs for a response + Assert.assertTrue((t2-t1) >= timeoutMs) + } }