Index: core/src/main/scala/kafka/tools/DumpLogSegments.scala
===================================================================
--- core/src/main/scala/kafka/tools/DumpLogSegments.scala	(revision 1402327)
+++ core/src/main/scala/kafka/tools/DumpLogSegments.scala	(working copy)
@@ -26,7 +26,8 @@
 
   def main(args: Array[String]) {
     val print = args.contains("--print")
-    val files = args.filter(_ != "--print")
+    val verifyOnly = args.contains("--verifyOnly")
+    val files = args.filter(m => m != "--print" && m != "--verifyOnly")
 
     for(arg <- files) {
       val file = new File(arg)
@@ -35,21 +36,31 @@
         dumpLog(file, print)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
-        dumpIndex(file)
+        dumpIndex(file, verifyOnly)
       }
     }
   }
   
   /* print out the contents of the index */
-  def dumpIndex(file: File) {
+  def dumpIndex(file: File, verifyOnly: Boolean) {
     val startOffset = file.getName().split("\\.")(0).toLong
+    val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix
+    val logFile = new File(logFileName)
+    val messageSet = new FileMessageSet(logFile)
     val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
+      val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes())
+      val messageAndOffset = partialFileMessageSet.head
+      if(messageAndOffset.offset != entry.offset + index.baseOffset) {
+        System.err.println(("Index position %d doesn't match log position at offset %d").format(entry.offset + index.baseOffset, messageAndOffset.offset))
+        exit(1)
+      }
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
       if(entry.offset == 0 && i > 0)
         return
-      println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
+      if (!verifyOnly)
+        println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
     }
   }
   
@@ -59,8 +70,17 @@
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file)
     var validBytes = 0L
+    var lastOffset = -1l
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message
+
+      if(lastOffset == -1)
+        lastOffset = messageAndOffset.offset
+      else if (messageAndOffset.offset != lastOffset +1) {
+        System.err.println("The offset in the data log file [%s] is not consecutive, [%d] follows [%d]".format(file.getName, messageAndOffset.offset, lastOffset))
+        exit(1)
+      } else lastOffset = messageAndOffset.offset
+
       print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
             " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
             " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
Index: bin/kafka-dump-log-segment.sh
===================================================================
--- bin/kafka-dump-log-segment.sh	(revision 0)
+++ bin/kafka-dump-log-segment.sh	(revision 0)
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+$(dirname $0)/kafka-run-class.sh kafka.tools.DumpLogSegments $@

Property changes on: bin/kafka-dump-log-segment.sh
___________________________________________________________________
Added: svn:executable
   + *

