Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(revision 1162234)
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(working copy)
@@ -136,20 +136,28 @@
         } catch {
           case e =>
             if (skipMessageOnError)
-              logger.error("error processing message, skipping and resume consumption: " + e)
+              logger.error("Error processing message, skipping this message: ", e)
             else
               throw e
         }
+        if(System.out.checkError) { 
+          // This means no one is listening to our output stream any more, time to shutdown
+          System.err.println("Unable to write to standard out, closing consumer.")
+	  System.out.flush()
+          formatter.close()
+          connector.shutdown()
+          System.exit(1)
+        }
       }
     } catch {
-      case e => logger.error("error processing message, stop consuming: " + e)
+      case e => logger.error("Error processing message, stopping consumer: ", e)
     }
       
     System.out.flush()
     formatter.close()
     connector.shutdown()
   }
-  
+
   def tryParse(parser: OptionParser, args: Array[String]) = {
     try {
       parser.parse(args : _*)
