Index: core/src/main/scala/kafka/tools/DumpLogSegments.scala
===================================================================
--- core/src/main/scala/kafka/tools/DumpLogSegments.scala	(revision 1404857)
+++ core/src/main/scala/kafka/tools/DumpLogSegments.scala	(working copy)
@@ -21,46 +21,89 @@
 import kafka.message._
 import kafka.log._
 import kafka.utils._
+import collection.mutable
 
 object DumpLogSegments {
 
   def main(args: Array[String]) {
     val print = args.contains("--print")
-    val files = args.filter(_ != "--print")
+    val verifyOnly = args.contains("--verifyOnly")
+    val files = args.filter(m => m != "--print" && m != "--verifyOnly")
 
+    val misMatchesForIndexFilesMap = new mutable.HashMap[String, mutable.LinkedList[(Int, Int)]]
+    val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, mutable.LinkedList[(Int, Int)]]
+
     for(arg <- files) {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
-        dumpIndex(file)
+        dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap)
       }
     }
+    misMatchesForIndexFilesMap.foreach {
+      case (fileName, listOfMismatches) => {
+        System.err.println("Mismatches in :" + fileName)
+        listOfMismatches.foreach(m => {
+          System.err.println("  Index position: %d, log position: %d".format(m._1, m._2))
+        })
+      }
+    }
+    nonConsecutivePairsForLogFilesMap.foreach {
+      case (fileName, listOfNonSecutivePairs) => {
+        System.err.println("Non-secutive offsets in :" + fileName)
+        listOfNonSecutivePairs.foreach(m => {
+          System.err.println("  %d is followed by %d".format(m._1, m._2))
+        })
+      }
+    }
   }
   
   /* print out the contents of the index */
-  def dumpIndex(file: File) {
+  private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, mutable.LinkedList[(Int, Int)]]) {
     val startOffset = file.getName().split("\\.")(0).toLong
+    val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix
+    val logFile = new File(logFileName)
+    val messageSet = new FileMessageSet(logFile)
     val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
+      val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes())
+      val messageAndOffset = partialFileMessageSet.head
+      if(messageAndOffset.offset != entry.offset + index.baseOffset) {
+        var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, new mutable.LinkedList[(Int, Int)])
+        misMatchesSeq = misMatchesSeq.:+((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)])
+        misMatchesForIndexFilesMap.put(file.getName, misMatchesSeq)
+      }
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
       if(entry.offset == 0 && i > 0)
         return
-      println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
+      if (!verifyOnly)
+        println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
     }
   }
   
   /* print out the contents of the log */
-  def dumpLog(file: File, printContents: Boolean) {
+  private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, mutable.LinkedList[(Int, Int)]]) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file)
     var validBytes = 0L
+    var lastOffset = -1l
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message
+
+      if(lastOffset == -1)
+        lastOffset = messageAndOffset.offset
+      else if (messageAndOffset.offset != lastOffset +1) {
+        var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, new mutable.LinkedList[(Int, Int)])
+        nonConsecutivePairsSeq = nonConsecutivePairsSeq.:+((lastOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)])
+        nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
+      }
+      lastOffset = messageAndOffset.offset
+
       print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
             " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
             " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
@@ -78,5 +121,4 @@
     if(trailingBytes > 0)
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
-  
 }
Index: bin/kafka-dump-log-segment.sh
===================================================================
--- bin/kafka-dump-log-segment.sh	(revision 0)
+++ bin/kafka-dump-log-segment.sh	(revision 0)
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+$(dirname $0)/kafka-run-class.sh kafka.tools.DumpLogSegments $@
