Index: core/src/main/scala/kafka/tools/DumpLogSegments.scala =================================================================== --- core/src/main/scala/kafka/tools/DumpLogSegments.scala (revision 1302017) +++ core/src/main/scala/kafka/tools/DumpLogSegments.scala (working copy) @@ -20,34 +20,91 @@ import java.io._ import kafka.message._ import kafka.utils._ +import joptsimple.OptionParser -object DumpLogSegments { +object DumpLogSegments extends Logging { def main(args: Array[String]) { - var isNoPrint = false; - for(arg <- args) - if ("-noprint".compareToIgnoreCase(arg) == 0) - isNoPrint = true; + val parser = new OptionParser + val logSegmentOpt = parser.accepts("log", "REQUIRED: The kafka log segment to be dumped") + .withRequiredArg + .describedAs(".kafka file") + .ofType(classOf[String]) + val noPrintOpt = parser.accepts("noprint", "Whether to print the message or not") + .withOptionalArg() + .describedAs("topic") + .ofType(classOf[java.lang.Boolean]) + val outputLogSegmentOpt = parser.accepts("output-file", "the output file to dump this log segment to") + .withOptionalArg() + .describedAs("output file") + .ofType(classOf[String]) + val startingOffsetOpt = parser.accepts("start-offset", "The offset inside the log to start dumping from") + .withOptionalArg() + .describedAs("starting offset") + .ofType(classOf[java.lang.Long]) - for(arg <- args) { - if (! ("-noprint".compareToIgnoreCase(arg) == 0) ) { - val file = new File(arg) - println("Dumping " + file) - val startOffset = file.getName().split("\\.")(0).toLong - var offset = 0L - println("Starting offset: " + startOffset) - val messageSet = new FileMessageSet(file, false) + val options = parser.parse(args : _*) + + for(arg <- List(logSegmentOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val logSegmentName = options.valueOf(logSegmentOpt) + val logSegment = new File(logSegmentName) + val startOffset = java.lang.Long.valueOf(logSegment.getName().split("\\.")(0)) + val noPrint = options.has(noPrintOpt) + + val startingDumpOffset: java.lang.Long = + if(options.has(startingOffsetOpt)) options.valueOf(startingOffsetOpt) else startOffset + + val outputLogSegmentName = + if(options.has(outputLogSegmentOpt)) Some("/tmp/%d.kafka".format(startingDumpOffset.longValue())) else None + + info("Dumping " + logSegment) + info("Starting offset: " + startOffset) + + var relativeOffset: java.lang.Long = 0L + val messageSet = new FileMessageSet(logSegment, false) + + outputLogSegmentName match { + case Some(outputFile) => + info("Dumping %s starting at %d to file %s".format(logSegment.getAbsolutePath, startingDumpOffset.longValue(), + outputFile)) + val outputFileStream = new FileOutputStream(new File(outputFile)) + val inputFileStream = new FileInputStream(logSegment) + val fileOffset = startingDumpOffset.longValue() - startOffset.longValue() + val bytesSkipped = inputFileStream.skip(fileOffset) + if(bytesSkipped < fileOffset) { + System.err.println("Unable to seek into position %d file %s".format(fileOffset, logSegmentName)) + System.exit(1) + } + val messageData = new Array[Byte](4086) + while(inputFileStream.read(messageData) > 0) + outputFileStream.write(messageData) + outputFileStream.flush() + outputFileStream.close() + inputFileStream.close() + case None => for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message - println("offset: " + (startOffset + offset) + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec) - if (!isNoPrint) - println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset = messageAndOffset.offset + + if((startOffset.longValue() + relativeOffset.longValue()) >= startingDumpOffset.longValue()) { + info("offset: " + (startOffset.longValue() + relativeOffset.longValue()) + " isvalid: " + msg.isValid + + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec) + if (!noPrint) + info("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + } + + relativeOffset = messageAndOffset.offset } - println("tail of the log is at offset: " + (startOffset + offset)) - } + } + + info("tail of the log is at offset: " + (startOffset.longValue() + relativeOffset.longValue())) } - + } Index: core/src/main/scala/kafka/tools/FileChannelTest.scala =================================================================== --- core/src/main/scala/kafka/tools/FileChannelTest.scala (revision 0) +++ core/src/main/scala/kafka/tools/FileChannelTest.scala (revision 0) @@ -0,0 +1,55 @@ +package kafka.tools + +import java.nio.channels.FileChannel +import java.io.{RandomAccessFile, File} +import joptsimple.OptionParser +import kafka.utils.{Utils, Logging} +import java.util.Random +import java.nio.ByteBuffer + +object FileChannelTest extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + val numIterationsOpt = parser.accepts("num-iterations", "The number of bytebuffer write tests to run") + .withRequiredArg() + .describedAs("number of iterations") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val maxBufferSizeOpt = parser.accepts("max-buffer-size", "The maximum size buffer to write to the file") + .withRequiredArg() + .describedAs("maximum buffer size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(500000) + + val options = parser.parse(args : _*) + + val numIterations = options.valueOf(numIterationsOpt).intValue() + val maxBufferSize = options.valueOf(maxBufferSizeOpt).intValue() + + val file = new File("/tmp/file-channel-test") + val fileChannel = new RandomAccessFile(file, "rw").getChannel() + + val random = new Random + var numSuccessfulWrites = 0 + for(i <- 0 until numIterations) { + val bufferSize = random.nextInt(maxBufferSize) + val bytes = new Array[Byte](bufferSize) + random.nextBytes(bytes) + val buffer = ByteBuffer.allocate(bytes.length) + buffer.put(bytes) + buffer.rewind() + + // write to file and assert number of bytes written + val written = fileChannel.write(buffer) + assert(written == bytes.length, "Number of bytes written must be equal to buffer length") + assert(buffer.remaining() == 0, "Number of remaining bytes in a ByteBuffer should be 0") + info("Wrote buffer of length %s to file %s".format(written, file.getAbsolutePath)) + numSuccessfulWrites += 1 + } + + info("Wrote %d buffers to file %s successfully".format(numSuccessfulWrites, file.getAbsolutePath)) + + fileChannel.close() + } +} \ No newline at end of file Index: core/src/main/scala/kafka/message/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/FileMessageSet.scala (revision 1302017) +++ core/src/main/scala/kafka/message/FileMessageSet.scala (working copy) @@ -111,6 +111,7 @@ override def makeNext(): MessageAndOffset = { // read the size of the item val sizeBuffer = ByteBuffer.allocate(4) + trace("\n\nReading file message set from location " + location) channel.read(sizeBuffer, location) if(sizeBuffer.hasRemaining) return allDone() @@ -121,7 +122,9 @@ return allDone() // read the item itself + trace("Creating message byte buffer of size " + size) val buffer = ByteBuffer.allocate(size) + trace("Reading %d bytes from byte buffer from location %d".format(size, location+4)) channel.read(buffer, location + 4) if(buffer.hasRemaining) return allDone() @@ -129,7 +132,10 @@ // increment the location and return the item location += size + 4 - new MessageAndOffset(new Message(buffer), location) + val message = new Message(buffer) + trace("Returning message with next fetch offset %d, %d magic, %x attributes, %d compression codec, %d payload size" + .format(location, message.magic, message.attributes, message.compressionCodec.codec, message.payloadSize)) + new MessageAndOffset(message, location) } } }