diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala new file mode 100644 index 0000000..7ecb7cd --- /dev/null +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.net.InetSocketAddress +import java.nio.channels._ +import kafka.utils.{Utils, nonthreadsafe, Logging} + +/** + * A blocking channel with timeouts correctly enabled + * + */ +@nonthreadsafe +class BlockingChannel(val bufferSize: Int, val timeoutMs: Int) extends Logging { + + @volatile private var connected = false + private var channel: SocketChannel = null + private var readChannel: ReadableByteChannel = null + private var writeChannel: GatheringByteChannel = null + private val lock = new Object() + + def connect(host: String, port: Int) = lock synchronized { + if(!connected) { + channel = SocketChannel.open() + channel.socket.setSendBufferSize(bufferSize) + channel.configureBlocking(true) + channel.socket.setSoTimeout(timeoutMs) + channel.socket.setKeepAlive(true) + channel.connect(new InetSocketAddress(host, port)) + + writeChannel = channel + readChannel = Channels.newChannel(channel.socket().getInputStream) + connected = true + } + } + + def disconnect() = lock synchronized { + if(connected || channel != null) { + // closing the main socket channel *should* close the read and write channels + // but let's do it to be sure. + swallow(channel.close()) + swallow(channel.socket.close()) + swallow(readChannel.close()) + channel = null; readChannel = null; writeChannel = null + connected = false + } + } + + def isConnected = connected + + def send(request: Request):Int = { + if(!connected) + throw new ClosedChannelException() + else + Utils.sendRequest(request, writeChannel) + } + + def receive(): Tuple2[Receive, Int] = { + if(!connected) + throw new ClosedChannelException() + else + Utils.getResponse(readChannel) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 457a983..cf461e5 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -50,12 +50,13 @@ trait Receive extends Transmission { def readFrom(channel: ReadableByteChannel): Int def readCompletely(channel: ReadableByteChannel): Int = { - var read = 0 + var totalRead = 0 while(!complete) { - read = readFrom(channel) + val read = readFrom(channel) trace(read + " bytes read.") + totalRead += read } - read + totalRead } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index ba4eaae..e42d40a 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,14 +17,11 @@ package kafka.producer -import java.net.InetSocketAddress -import java.nio.channels.SocketChannel import kafka.api._ import kafka.common.MessageSizeTooLargeException import kafka.message.MessageSet -import kafka.network.{BoundedByteBufferSend, Request, Receive} +import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive} import kafka.utils._ -import kafka.utils.Utils._ /* * Send a message set. @@ -33,11 +30,10 @@ import kafka.utils.Utils._ class SyncProducer(val config: SyncProducerConfig) extends Logging { private val MaxConnectBackoffMs = 60000 - private var channel : SocketChannel = null private var sentOnConnection = 0 private val lock = new Object() - @volatile - private var shutdown: Boolean = false + @volatile private var shutdown: Boolean = false + private val blockingChannel = new BlockingChannel(config.bufferSize, config.socketTimeoutMs) debug("Instantiating Scala Sync Producer") @@ -64,21 +60,19 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Tuple2[Receive, Int] = null try { - sendRequest(request, channel) - response = getResponse(channel) + blockingChannel.send(request) + response = blockingChannel.receive() } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect() - println("sdfsdfsdf") throw e - case e => println("other sdfsdfsdfs"); throw e + case e => throw e } // TODO: do we still need this? sentOnConnection += 1 if(sentOnConnection >= config.reconnectInterval) { - disconnect() - channel = connect() + reconnect() sentOnConnection = 0 } SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime) @@ -119,41 +113,38 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { throw new MessageSizeTooLargeException } + private def reconnect() { + disconnect() + connect() + } + /** * Disconnect from current channel, closing connection. * Side effect: channel field is set to null on successful disconnect */ private def disconnect() { try { - if(channel != null) { + if(blockingChannel.isConnected) { info("Disconnecting from " + config.host + ":" + config.port) - swallow(channel.close()) - swallow(channel.socket.close()) - channel = null + blockingChannel.disconnect() } } catch { case e: Exception => error("Error on disconnect: ", e) } } - private def connect(): SocketChannel = { + private def connect(): BlockingChannel = { var connectBackoffMs = 1 val beginTimeMs = SystemTime.milliseconds - while(channel == null && !shutdown) { + while(!blockingChannel.isConnected && !shutdown) { try { - channel = SocketChannel.open() - channel.socket.setSendBufferSize(config.bufferSize) - channel.configureBlocking(true) - channel.socket.setSoTimeout(config.socketTimeoutMs) - channel.socket.setKeepAlive(true) - channel.connect(new InetSocketAddress(config.host, config.port)) + blockingChannel.connect(config.host, config.port) info("Connected to " + config.host + ":" + config.port + " for producing") - } - catch { + } catch { case e: Exception => { disconnect() val endTimeMs = SystemTime.milliseconds - if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs) { + if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs ) { error("Producer connection to " + config.host + ":" + config.port + " timing out after " + config.connectTimeoutMs + " ms", e) throw e } @@ -163,12 +154,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } } - channel + blockingChannel } private def getOrMakeConnection() { - if(channel == null) { - channel = connect() + if(!blockingChannel.isConnected) { + connect() } } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index d976d1b..69e427e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -686,12 +686,12 @@ object Utils extends Logging { } } - def sendRequest(request: Request, channel: SocketChannel) = { + def sendRequest(request: Request, channel: GatheringByteChannel) = { val send = new BoundedByteBufferSend(request) send.writeCompletely(channel) } - def getResponse(channel: SocketChannel): Tuple2[Receive,Int] = { + def getResponse(channel: ReadableByteChannel): Tuple2[Receive,Int] = { val response = new BoundedByteBufferReceive() response.readCompletely(channel) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 92b94ef..67f5a03 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -18,18 +18,19 @@ package kafka.producer import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.consumer.SimpleConsumer -import org.I0Itec.zkclient.ZkClient -import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import java.util.Properties -import org.apache.log4j.{Level, Logger} -import org.junit.Test -import kafka.utils.{TestZKUtils, Utils, TestUtils} -import kafka.message.Message import kafka.admin.CreateTopicCommand import kafka.api.FetchRequestBuilder +import kafka.common.FailedToSendMessageException +import kafka.consumer.SimpleConsumer +import kafka.message.Message +import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} +import kafka.utils.{TestZKUtils, Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.log4j.{Level, Logger} +import org.I0Itec.zkclient.ZkClient import org.junit.Assert._ +import org.junit.Test class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { private val brokerId1 = 0 @@ -124,54 +125,53 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close } - // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions - // and when leader logic is changed. + // TODO: Need to rewrite when leader logic is changed. @Test def testZKSendWithDeadBroker() { -// val props = new Properties() -// props.put("serializer.class", "kafka.serializer.StringEncoder") -// props.put("partitioner.class", "kafka.utils.StaticPartitioner") -// props.put("socket.timeout.ms", "200") -// props.put("zk.connect", TestZKUtils.zookeeperConnect) -// -// // create topic -// CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") -// -// val config = new ProducerConfig(props) -// -// val producer = new Producer[String, String](config) -// try { -// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and -// // all partitions have broker 0 as the leader. -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// Thread.sleep(100) -// // kill 2nd broker -// server1.shutdown -// Thread.sleep(500) -// -// // Since all partitions are unavailable, this request will be dropped -// try { -// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) -// fail("Leader broker for \"new-topic\" isn't up, should not be able to send data") -// } catch { -// case e: kafka.common.FailedToSendMessageException => // success -// case e => fail("Leader broker for \"new-topic\" isn't up, should not be able to send data") -// } -// -// // restart server 1 -// server1.startup() -// Thread.sleep(200) -// -// // cross check if brokers got the messages -// val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) -// val messageSet1 = response1.messageSet("new-topic", 0).iterator -// assertTrue("Message set should have 1 message", messageSet1.hasNext) -// assertEquals(new Message("test1".getBytes), messageSet1.next.message) -// assertFalse("Message set should not have more than 1 message", messageSet1.hasNext) -// } catch { -// case e: Exception => fail("Not expected", e) -// } -// producer.close + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("partitioner.class", "kafka.utils.StaticPartitioner") + props.put("socket.timeout.ms", "500") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + + // create topic + CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") + + val config = new ProducerConfig(props) + + val producer = new Producer[String, String](config) + try { + // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and + // all partitions have broker 0 as the leader. + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + Thread.sleep(100) + // kill 2nd broker + server1.shutdown + Thread.sleep(200) + + // Since all partitions are unavailable, this request will be dropped + try { + producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + fail("Leader broker for 'new-topic' isn't up, should not be able to send data") + } catch { + case e: FailedToSendMessageException => // success + case e => fail("Leader broker for 'new-topic' isn't up, should not be able to send data") + } + + // restart server 1 + server1.startup() + Thread.sleep(200) + + // cross check if brokers got the messages + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0).iterator + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.next.message) + assertFalse("Message set should not have more than 1 message", messageSet1.hasNext) + } catch { + case e: Exception => fail("Unexpected exception " + e.getMessage) + } + producer.close } @Test