diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3164f78..8514cd8 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -111,6 +111,12 @@ class RequestSendThread(val controllerId: Int, private val lock = new Object() private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private def getOrMakeConnection() { + if(!channel.isConnected) { + channel.connect() + } + } + override def doWork(): Unit = { val queueItem = queue.take() val request = queueItem._1 @@ -120,6 +126,7 @@ class RequestSendThread(val controllerId: Int, try{ lock synchronized { + getOrMakeConnection() channel.send(request) receive = channel.receive() var response: RequestOrResponse = null @@ -138,8 +145,9 @@ class RequestSendThread(val controllerId: Int, } } catch { case e => - // log it and let it go. Let controller shut it down. - debug("Exception occurs", e) + warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) + // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. + channel.disconnect() } } }