Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1235456) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -211,10 +211,18 @@ // they are valid, insert them in the log lock synchronized { - val segment = segments.view.last - segment.messageSet.append(messages) - maybeFlush(numberOfMessages) - maybeRoll(segment) + try { + val segment = segments.view.last + segment.messageSet.append(messages) + maybeFlush(numberOfMessages) + maybeRoll(segment) + } + catch { + case e: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request", e) + Runtime.getRuntime.halt(1) + case e2 => throw e2 + } } } Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1235456) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -25,7 +25,6 @@ import kafka.common.ErrorMapping import kafka.utils.SystemTime import kafka.utils.Logging -import java.io.IOException /** * Logic to handle the various Kafka requests @@ -74,15 +73,8 @@ catch { case e => error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - Runtime.getRuntime.halt(1) - case _ => - } throw e } - None } def handleFetchRequest(request: Receive): Option[Send] = {