Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1401497)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -40,7 +40,7 @@
       val request = if(isFromOrdinaryConsumer)
         OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
       else
-        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId.toShort)
+        new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)
       producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     } catch {
       case e =>
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision 1401497)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(working copy)
@@ -24,6 +24,7 @@
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
 import scala.collection.JavaConversions._
+import kafka.message.MessageSet
 
 
 /**
@@ -91,6 +92,8 @@
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
         "skip it instead of halt.")
 
+    val noWaitAtEndOfLogOpt = parser.accepts("no-wait-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
+
     val options = parser.parse(args : _*)
     for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) {
       if(!options.has(arg)) {
@@ -110,6 +113,7 @@
 
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val printOffsets = if(options.has(printOffsetOpt)) true else false
+    val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
 
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
@@ -182,6 +186,10 @@
                     .build()
             val fetchResponse = simpleConsumer.fetch(fetchRequest)
             val messageSet = fetchResponse.messageSet(topic, partitionId)
+            if (messageSet.equals(MessageSet.Empty)  && noWaitAtEndOfLog) {
+              info("The simple consumer shell terminates when reach the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset))
+              return
+            }
             debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
             var consumed = 0
             for(messageAndOffset <- messageSet) {
@@ -206,6 +214,7 @@
               }
               consumed += 1
             }
+            offset = offset + 1
           }
         } catch {
           case e: Throwable =>
