diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7991e42..7f3d82e 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -17,7 +17,7 @@ package kafka.controller import kafka.network.{Receive, BlockingChannel} -import kafka.utils.{Logging, ShutdownableThread} +import kafka.utils.{Utils, Logging, ShutdownableThread} import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -81,8 +81,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) - channel.connect() - val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker.id, messageQueue, channel) + val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel) requestThread.setDaemon(false) brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) } @@ -107,12 +106,13 @@ class ControllerChannelManager (private val controllerContext: ControllerContext class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, - val toBrokerId: Int, + val toBroker: Broker, val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], val channel: BlockingChannel) - extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) { + extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + connectToBroker(toBroker, channel) override def doWork(): Unit = { val queueItem = queue.take() @@ -123,8 +123,24 @@ class RequestSendThread(val controllerId: Int, try{ lock synchronized { - channel.connect() // establish a socket connection if needed - channel.send(request) + var isSendSuccessful = false + while(isRunning.get() && !isSendSuccessful) { + // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a + // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. + try { + channel.send(request) + isSendSuccessful = true + } catch { + case e => // if the send was not successful, reconnect to broker and resend the message + error("Controller %d epoch %d failed to send %s request to broker %s. Reconnecting to broker.".format(controllerId, controllerContext.epoch, + RequestKeys.nameForKey(request.requestId.get), toBroker.toString()), e) + channel.disconnect() + connectToBroker(toBroker, channel) + isSendSuccessful = false + // backoff before retrying the connection and send + Utils.swallow(Thread.sleep(300)) + } + } receive = channel.receive() var response: RequestOrResponse = null request.requestId.get match { @@ -135,8 +151,8 @@ class RequestSendThread(val controllerId: Int, case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) } - stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d" - .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId)) + stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s" + .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString())) if(callback != null){ callback(response) @@ -144,11 +160,23 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => - warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) + warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() } } + + private def connectToBroker(broker: Broker, channel: BlockingChannel) { + try { + channel.connect() + info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString())) + } catch { + case e => { + channel.disconnect() + error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e) + } + } + } } class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 419156e..041cfa5 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -120,11 +120,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } - private def reconnect() { - disconnect() - connect() - } - /** * Disconnect from current channel, closing connection. * Side effect: channel field is set to null on successful disconnect