diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index e38d2fa..16b1448 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -74,7 +74,14 @@ class Producer[K,V](val config: ProducerConfig,
         throw new ProducerClosedException
       recordStats(messages)
       sync match {
-        case true => eventHandler.handle(messages)
+        case true => {
+          try {
+            eventHandler.handle(messages)
+          }
+          catch {
+            case e: InterruptedException => throw new ProducerInterruptedException(e)
+          }
+        }
         case false => asyncSend(messages)
       }
     }
@@ -89,6 +96,7 @@ class Producer[K,V](val config: ProducerConfig,
 
   private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
+      var interruptedException: InterruptedException = null
       val added = config.queueEnqueueTimeoutMs match {
         case 0  =>
           queue.offer(message)
@@ -103,13 +111,18 @@ class Producer[K,V](val config: ProducerConfig,
             }
           }
           catch {
-            case e: InterruptedException =>
+            case e: InterruptedException => {
+              interruptedException = e
               false
+            }
           }
       }
       if(!added) {
         producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
         producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
+        if (interruptedException != null) {
+          throw new ProducerInterruptedException(interruptedException)
+        }
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
         trace("Added to send queue an event: " + message.toString)
diff --git a/core/src/main/scala/kafka/producer/ProducerInterruptedException.scala b/core/src/main/scala/kafka/producer/ProducerInterruptedException.scala
new file mode 100644
index 0000000..6f0eaf0
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ProducerInterruptedException.scala
@@ -0,0 +1,5 @@
+package kafka.producer
+
+class ProducerInterruptedException(cause: InterruptedException) extends RuntimeException("producer thread interrupted", cause) {
+
+}
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1db6ac3..faf229b 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -17,10 +17,17 @@
 
 package kafka.producer
 
+import java.util
 import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
 import junit.framework.Assert._
+import junit.framework.Assert.assertEquals
+import junit.framework.Assert.assertTrue
+import junit.framework.Assert.assertFalse
+import junit.framework.Assert.fail
 import org.easymock.EasyMock
+import org.junit.Assert._
 import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
@@ -417,6 +424,69 @@ class AsyncProducerTest extends JUnit3Suite {
     EasyMock.verify(producerPool)
   }
 
+
+  @Test
+  def testInterruptOnBackgroundThread(): Unit = {
+    val props1 = new util.Properties()
+    props1.put("producer.type", "async")
+    props1.put("queue.buffering.max.ms", "60000")
+    props1.put("queue.enqueue.timeout.ms", "-1")
+
+    val topic = "interrupt-topic"
+    val producer1: Producer[String, String] = createProducer[String, String](
+      brokerList = getBrokerListStrFromConfigs(configs),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      producerProps = props1,
+      eventHandler = createStubEventHandler())
+
+    val threwIE = new AtomicBoolean()
+    val unexpected = new AtomicReference[Throwable]()
+    try {
+      val latch1 = new CountDownLatch(1)
+      val t = new Thread(new Runnable() {
+        override def run(): Unit = {
+          try {
+            try {
+              producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
+            }
+            finally {
+              latch1.countDown()
+            }
+
+            // spin briefly waiting for test thread to trigger an interrupt
+            while (!Thread.currentThread.isInterrupted) {
+              Thread.`yield`()
+            }
+
+            try {
+              producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
+            }
+            catch {
+              case e: ProducerInterruptedException => threwIE.set(true)
+            }
+          }
+          catch {
+            case e: Throwable => unexpected.set(e)
+          }
+        }
+      })
+      t.start()
+      latch1.await()
+      t.interrupt()
+      t.join(5000)
+      assertFalse("Expected thread to terminate", t.isAlive)
+    }
+    finally {
+      producer1.close()
+    }
+
+    assertTrue("Expected InterruptedException to be thrown on second send(...) call", threwIE.get)
+    if (unexpected.get() != null) {
+      fail("Unexpected exception on background thread", unexpected.get())
+    }
+  }
+
   @Test
   def testJavaProducer() {
     val topic = "topic1"
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 94d0028..d5542ff 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,7 @@ import java.nio.channels._
 import java.util.Random
 import java.util.Properties
 
+import kafka.producer.async.EventHandler
 import org.apache.kafka.common.utils.Utils._
 
 import collection.mutable.ListBuffer
@@ -362,7 +363,8 @@ object TestUtils extends Logging {
                            encoder: String = classOf[DefaultEncoder].getName,
                            keyEncoder: String = classOf[DefaultEncoder].getName,
                            partitioner: String = classOf[DefaultPartitioner].getName,
-                           producerProps: Properties = null): Producer[K, V] = {
+                           producerProps: Properties = null,
+                           eventHandler: EventHandler[K, V] = null): Producer[K, V] = {
     val props: Properties = getProducerConfig(brokerList)
 
     //override any explicitly specified properties
@@ -372,7 +374,18 @@ object TestUtils extends Logging {
     props.put("serializer.class", encoder)
     props.put("key.serializer.class", keyEncoder)
     props.put("partitioner.class", partitioner)
-    new Producer[K, V](new ProducerConfig(props))
+    if (eventHandler == null)
+      new Producer[K, V](new ProducerConfig(props))
+    else
+      new Producer[K, V](new ProducerConfig(props), eventHandler)
+  }
+
+  def createStubEventHandler[K, V](): EventHandler[K, V] = {
+    new EventHandler[K, V] {
+      override def handle(events: Seq[KeyedMessage[K, V]]): Unit = {}
+
+      override def close: Unit = {}
+    }
   }
 
   /**
