From 6f9a4d482baa872bb0db3ca04032d623c9627474 Mon Sep 17 00:00:00 2001 From: Marc Chung Date: Thu, 30 Oct 2014 14:13:39 -0700 Subject: [PATCH] Sets a connection timeout on the underlying BlockingChannel This fix sets a connectTimeoutMs to the same value as readTimeoutMs Sponsored by: Lookout, Inc. --- core/src/main/scala/kafka/network/BlockingChannel.scala | 11 +++++++---- core/src/main/scala/kafka/producer/SyncProducer.scala | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index eb7bb14..6e2a38e 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -42,7 +42,8 @@ class BlockingChannel( val host: String, private var readChannel: ReadableByteChannel = null private var writeChannel: GatheringByteChannel = null private val lock = new Object() - + private val connectTimeoutMs = readTimeoutMs + def connect() = lock synchronized { if(!connected) { try { @@ -55,19 +56,21 @@ class BlockingChannel( val host: String, channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) - channel.connect(new InetSocketAddress(host, port)) + channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true // settings may not match what we requested above - val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)." + val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." debug(msg.format(channel.socket.getSoTimeout, readTimeoutMs, channel.socket.getReceiveBufferSize, readBufferSize, channel.socket.getSendBufferSize, - writeBufferSize)) + writeBufferSize, + connectTimeoutMs)) + } catch { case e: Throwable => disconnect() } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 42c9503..35e9e8c 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { val brokerInfo = "host_%s-port_%s".format(config.host, config.port) val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) - trace("Instantiating Scala Sync Producer") + trace("Instantiating Scala Sync Producer with properties: %s".format(config.props)) private def verifyRequest(request: RequestOrResponse) = { /** -- 2.0.1