diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1437496..bc3e8f5 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -32,6 +32,10 @@ import org.apache.log4j.Logger
 object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
+  val SendResponse: Short = 0
+  val NoResponse: Short = 1
+  val CloseSocket: Short = 9
+
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
@@ -84,9 +88,12 @@ object RequestChannel extends Logging {
     }
   }
   
-  case class Response(processor: Int, request: Request, responseSend: Send) {
+  case class Response(processor: Int, request: Request, responseSend: Send, responseCode: Short) {
     request.responseCompleteTimeMs = SystemTime.milliseconds
 
+    def this(processor: Int, request: Request, responseSend: Send) =
+      this(processor, request, responseSend, if (responseSend == null) RequestChannel.NoResponse else RequestChannel.SendResponse)
+
     def this(request: Request, send: Send) =
       this(request.processor, request, send)
   }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5bd143..ba1f43f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -274,25 +274,34 @@ private[kafka] class Processor(val id: Int,
     while(curr != null) {
       val key = curr.request.requestKey.asInstanceOf[SelectionKey]
       try {
-        if(curr.responseSend == null) {
-          // a null response send object indicates that there is no response to send to the client.
-          // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
-          // that are sitting in the server's socket buffer
-          trace("Socket server received empty response to send, registering for read: " + curr)
-          key.interestOps(SelectionKey.OP_READ)
-          key.attach(null)
-          curr.request.updateRequestMetrics
-        } else {
-          trace("Socket server received response to send, registering for write: " + curr)
-          key.interestOps(SelectionKey.OP_WRITE)
-          key.attach(curr)
+        curr.responseCode match {
+          case RequestChannel.NoResponse => {
+            // a null response send object indicates that there is no response to send to the client.
+            // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
+            // that are sitting in the server's socket buffer
+            curr.request.updateRequestMetrics
+            trace("Socket server received empty response to send, registering for read: " + curr)
+            key.interestOps(SelectionKey.OP_READ)
+            key.attach(null)
+          }
+          case RequestChannel.SendResponse => {
+            trace("Socket server received response to send, registering for write: " + curr)
+            key.interestOps(SelectionKey.OP_WRITE)
+            key.attach(curr)
+          }
+          case RequestChannel.CloseSocket => {
+            curr.request.updateRequestMetrics
+            trace("Closing socket connection actively according to the response code.")
+            close(key)
+          }
+          case responseCode => throw new KafkaException("No mapping found for response code " + responseCode)
         }
       } catch {
         case e: CancelledKeyException => {
           debug("Ignoring response for closed socket.")
           close(key)
         }
-      }finally {
+      } finally {
         curr = requestChannel.receiveResponse(id)
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b17964e..c096565 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -52,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
     new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
 //  private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
-  private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
   private val partitionMetadataLock = new Object
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
@@ -171,8 +171,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
     if(produceRequest.requiredAcks == 0) {
       // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
-      // and is tuned for very high throughput
-      requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+      // and is tuned for very high throughput; however, if there is any exception in handling the request, since
+      // no response is expected by the producer the handler will cancel the request key to indicate the socket server
+      // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
+      if (numPartitionsInError != 0) {
+        warn(("Cancelling the request key to notify socket server close the connection due to error handling produce request " +
+          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
+          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.mkString("[",",","]")))
+        requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null, RequestChannel.CloseSocket))
+      } else {
+        requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+      }
     } else if (produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
         allPartitionHaveReplicationFactorOne ||
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index b511d90..3bd0ce9 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -236,7 +236,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Should fail since no leader exists for the partition.")
     } catch {
-      case e => // success
+      case e : org.scalatest.TestFailedException => throw e // catch and re-throw the failure message
+      case e2 => // otherwise success
     }
 
     // restart server 1
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5ee31d..b3e89c3 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -114,6 +114,33 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   }
 
   @Test
+  def testMessageSizeTooLargeWithAckZero() {
+    val server = servers.head
+
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    props.put("request.required.acks", "0")
+
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
+
+    // This message will be dropped silently since message size too large.
+    producer.send(TestUtils.produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+
+    // Send another message whose size is large enough to exceed the buffer size so
+    // the socket buffer will be flushed immediately;
+    // this send should fail since the socket has been closed
+    try {
+      producer.send(TestUtils.produceRequest("test", 0,
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+    } catch {
+      case e : java.io.IOException => // success
+      case e2 => throw e2
+    }
+  }
+
+  @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
     val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
