From 9cf804581aebbeb0fd3bd86b7f41a9bfae1a9169 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Thu, 6 Nov 2014 09:34:08 -0800 Subject: [PATCH] try/catch should include channel.receive() --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ecbfa0f..cabacad 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int, // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request) + receive = channel.receive() isSendSuccessful = true } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message - error(("Controller %d epoch %d failed to send request %s to broker %s. " + + warn(("Controller %d epoch %d failed to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() @@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int, Utils.swallow(Thread.sleep(300)) } } - receive = channel.receive() var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => @@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => - warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) + error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() } -- 1.8.5.2 (Apple Git-48)