Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1401522)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -29,7 +29,8 @@
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
 
   def run() { 
-    while(true) { 
+    while(true) {
+      try {
       val req = requestChannel.receiveRequest()
       if(req eq RequestChannel.AllDone){
         trace("receives shut down command, shut down".format(brokerId, id))
@@ -38,6 +39,9 @@
       req.dequeueTimeMs = SystemTime.milliseconds
       debug("handles request " + req)
       apis.handle(req)
+      } catch {
+        case e: Throwable => error("exception when handling request", e)
+      }
     }
   }
 
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1401522)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -52,16 +52,73 @@
    * Top-level method that handles all requests and multiplexes to the right api
    */
   def handle(request: RequestChannel.Request) {
-    request.requestId match {
-      case RequestKeys.ProduceKey => handleProducerRequest(request)
-      case RequestKeys.FetchKey => handleFetchRequest(request)
-      case RequestKeys.OffsetsKey => handleOffsetRequest(request)
-      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
-      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
-      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
-      case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
-    }
-    request.apiLocalCompleteTimeMs = SystemTime.milliseconds
+    try{
+      request.requestId match {
+        case RequestKeys.ProduceKey => handleProducerRequest(request)
+        case RequestKeys.FetchKey => handleFetchRequest(request)
+        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
+        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
+        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
+        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+        case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
+      }
+    } catch {
+      case e: Throwable =>
+        request.requestId match {
+          case RequestKeys.ProduceKey =>
+            val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
+            val producerResponseStatus = apiRequest.data.map {
+              case (topicAndPartition, data) =>
+                (topicAndPartition, ProducerResponseStatus(ErrorMapping.UnknownCode, -1l))
+            }
+            val errorResponse = ProducerResponse(apiRequest.versionId, apiRequest.correlationId, producerResponseStatus)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.FetchKey =>
+            val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
+            val fetchResponsePartitionData = apiRequest.requestInfo.map {
+              case (topicAndPartition, data) =>
+                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.UnknownCode, 0, -1, null))
+            }
+            val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId, fetchResponsePartitionData)
+            requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.OffsetsKey =>
+            val apiRequest = request.requestObj.asInstanceOf[OffsetRequest]
+            val partitionOffsetResponseMap = apiRequest.requestInfo.map {
+              case (topicAndPartition, partitionOffsetRequest) =>
+                (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.UnknownCode, null))
+            }
+            val errorResponse = OffsetResponse(apiRequest.versionId, partitionOffsetResponseMap)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.MetadataKey =>
+            val apiRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
+            val topicMeatadata = apiRequest.topics.map {
+              topic => TopicMetadata(topic, Nil, ErrorMapping.UnknownCode)
+            }
+            val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.LeaderAndIsrKey =>
+            val apiRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
+            val responseMap = apiRequest.partitionStateInfos.map {
+              case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.UnknownCode)
+            }
+            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap, ErrorMapping.UnknownCode)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.StopReplicaKey =>
+            val apiRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+            val responseMap = apiRequest.partitions.map {
+              case topicAndPartition => (topicAndPartition, ErrorMapping.UnknownCode)
+            }.toMap
+            error("error when handling request %s".format(apiRequest), e)
+            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap, ErrorMapping.UnknownCode)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+        }
+    } finally
+      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
