diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1437496..7e06223 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -84,7 +84,7 @@ object RequestChannel extends Logging {
     }
   }
   
-  case class Response(processor: Int, request: Request, responseSend: Send) {
+  case class Response(processor: Int, request: Request, responseSend: Send, closeSocket: Boolean = false) {
     request.responseCompleteTimeMs = SystemTime.milliseconds
 
     def this(request: Request, send: Send) =
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5bd143..e5035e2 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -278,10 +278,16 @@ private[kafka] class Processor(val id: Int,
           // a null response send object indicates that there is no response to send to the client.
           // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
           // that are sitting in the server's socket buffer
-          trace("Socket server received empty response to send, registering for read: " + curr)
-          key.interestOps(SelectionKey.OP_READ)
-          key.attach(null)
-          curr.request.updateRequestMetrics
+          if (curr.closeSocket) {
+            info("Closing socket for " + channelFor(key).socket.getInetAddress + " following the close-socket signal")
+            curr.request.updateRequestMetrics
+            close(key)
+          } else {
+            trace("Socket server received empty response to send, registering for read: " + curr)
+            key.interestOps(SelectionKey.OP_READ)
+            key.attach(null)
+            curr.request.updateRequestMetrics
+          }
         } else {
           trace("Socket server received response to send, registering for write: " + curr)
           key.interestOps(SelectionKey.OP_WRITE)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 208e3ef..20beaaf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -171,8 +171,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
     if(produceRequest.requiredAcks == 0) {
       // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
-      // and is tuned for very high throughput
-      requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+      // and is tuned for very high throughput; however, if there is any exception in handling the request the response will
+      // indicate the socket server to close the socket
+      if (numPartitionsInError != 0) {
+        debug("Sending the close socket signal due to error handling produce request [%s] with Ack=0".format(produceRequest.toString))
+        requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null, true))
+      } else {
+        requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+      }
     } else if (produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
         allPartitionHaveReplicationFactorOne ||
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index b511d90..6648d52 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -205,6 +205,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", "2000")
     props.put("request.required.acks", "1")
+    props.put("message.send.max.retries", "1")
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     val topic = "new-topic"
@@ -219,6 +220,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
+
     try {
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
       // on broker 0
@@ -257,6 +259,42 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   }
 
   @Test
+  def testSendWithAckZeroDeadBroker() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props.put("request.timeout.ms", "2000")
+    props.put("request.required.acks", "1")
+    props.put("message.send.max.retries", "1")
+    props.put("request.required.acks", "0")
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+
+    val topic = "new-topic"
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0,0,0,0")
+    // waiting for 1 partition is enough
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3, 500)
+
+    val config = new ProducerConfig(props)
+    val producer = new Producer[String, String](config)
+
+    try {
+      val veryLargeMessage = String.format("%1$-"+config1.logIndexSizeMaxBytes+"s", "test ").replace(' ', 'x')
+      // These sends should fail since there are no available brokers
+      producer.send(new KeyedMessage[String, String](topic, "test", veryLargeMessage))
+      fail("Should fail since message size too large.")
+    } catch {
+      case e => // success
+    }
+
+    producer.close
+  }
+
+  @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()
