diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 7e7f344..4ea6d3d 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -29,44 +29,63 @@ import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} /** - * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts - * will fail on an immutable message set. An optional limit and start position can be applied to the message set - * which will control the position in the file at which the set begins. + * An on-disk message set. An optional start and end position can be applied to the message set + * which will allow slicing a subset of the file. + * @param file The file name + * @param channel the underlying file channel used + * @param start The absolute position in the file from which the message set begins + * @param end The absolute position in the file from which the message set ends */ @nonthreadsafe class FileMessageSet private[kafka](val file: File, private[log] val channel: FileChannel, - private[log] val start: Int = 0, - private[log] val limit: Int = Int.MaxValue, - initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging { + private[log] val start: Int, + private[log] val end: Int, + isSlice: Boolean) extends MessageSet with Logging { /* the size of the message set in bytes */ - private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start) + private val _size = + if(isSlice) + new AtomicInteger(end - start) // don't check the file size if this is just a slice view + else + new AtomicInteger(math.min(channel.size().toInt, end) - start) - if (initChannelPositionToEnd) { - /* set the file position to the last byte in the file */ + /* if this is not a slice, update the file pointer to the end of the file */ + if (!isSlice) channel.position(channel.size) - } /** - * Create a file message set with no limit or offset + * Create a file message set with no slicing + */ + def this(file: File, channel: FileChannel) = + this(file, channel, start = 0, end = Int.MaxValue, isSlice = false) + + /** + * Create a file message set with no slicing */ - def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue) + def this(file: File) = + this(file, Utils.openChannel(file, mutable = true)) /** - * Create a file message set with no limit or offset + * Create a slice view of the file message set that begins and ends at the given byte offsets */ - def this(file: File) = this(file, Utils.openChannel(file, mutable = true)) + def this(file: File, channel: FileChannel, start: Int, end: Int) = + this(file, channel, start, end, isSlice = true) /** * Return a message set which is a view into this set starting from the given position and with the given size limit. + * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read. + * If this message set is already sliced, the position will be taken relative to that slicing */ def read(position: Int, size: Int): FileMessageSet = { + if(position < 0) + throw new IllegalArgumentException("Invalid position: " + position) + if(size < 0) + throw new IllegalArgumentException("Invalid size: " + size) new FileMessageSet(file, channel, - this.start + position, - scala.math.min(this.start + position + size, sizeInBytes()), - false) + start = this.start + position, + end = math.min(this.start + position + size, sizeInBytes())) } /** @@ -76,7 +95,7 @@ class FileMessageSet private[kafka](val file: File, private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { var position = startingPosition val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) - val size = _size.get() + val size = sizeInBytes() while(position + MessageSet.LogOverhead < size) { buffer.rewind() channel.read(buffer, position) @@ -97,16 +116,25 @@ class FileMessageSet private[kafka](val file: File, * Write some of this set to the given channel, return the amount written */ def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = - channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt + channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt /** + * Get a shallow iterator over the messages in the set. + */ + override def iterator() = iterator(Int.MaxValue) + + /** * Get an iterator over the messages in the set. We only do shallow iteration here. + * Only allow messages less than maxMessageSize (in case the size itself has become corrupted). */ - override def iterator: Iterator[MessageAndOffset] = { + def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start override def makeNext(): MessageAndOffset = { + if(location >= end) + return allDone() + // read the size of the item val sizeOffsetBuffer = ByteBuffer.allocate(12) channel.read(sizeOffsetBuffer, location) @@ -116,8 +144,10 @@ class FileMessageSet private[kafka](val file: File, sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() - if (size < Message.MinHeaderSize) + if(size < Message.MinHeaderSize) return allDone() + if(size > maxMessageSize) + throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) @@ -176,9 +206,9 @@ class FileMessageSet private[kafka](val file: File, * given size falls on a valid byte offset. */ def truncateTo(targetSize: Int) = { - if(targetSize > sizeInBytes()) - throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + - " size of this log segment is only %d bytes".format(sizeInBytes())) + if(targetSize > sizeInBytes || targetSize < 0) + throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + + " size of this log segment is " + sizeInBytes + " bytes.") channel.truncate(targetSize) channel.position(targetSize) _size.set(targetSize) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4cb2445..024b6bc 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -20,9 +20,11 @@ package kafka.log import kafka.api.OffsetRequest import java.io.{IOException, File} import java.util.{Comparator, Collections, ArrayList} +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import kafka.utils._ import scala.math._ +import scala.collection.JavaConversions.asIterable; import java.text.NumberFormat import kafka.server.BrokerTopicStat import kafka.message._ @@ -30,63 +32,6 @@ import kafka.common._ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -object Log { - val LogFileSuffix = ".log" - val IndexFileSuffix = ".index" - - /** - * Search for the greatest range with start <= the target value. - */ - def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = { - if(ranges.size < 1) - return None - - // check out of bounds - if(value < ranges(0).start) - return None - - var low = 0 - var high = arraySize - 1 - while(low < high) { - val mid = ceil((high + low) / 2.0).toInt - val found = ranges(mid) - if(found.start == value) - return Some(found) - else if (value < found.start) - high = mid - 1 - else - low = mid - } - Some(ranges(low)) - } - - def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] = - findRange(ranges, value, ranges.length) - - /** - * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros - * so that ls sorts the files numerically - */ - def filenamePrefixFromOffset(offset: Long): String = { - val nf = NumberFormat.getInstance() - nf.setMinimumIntegerDigits(20) - nf.setMaximumFractionDigits(0) - nf.setGroupingUsed(false) - nf.format(offset) - } - - def logFilename(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) - - def indexFilename(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) - - def getEmptyOffsets(timestamp: Long): Seq[Long] = - if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) - Seq(0L) - else Nil -} - /** * An append-only log for storing messages. @@ -106,9 +51,7 @@ private[kafka] class Log(val dir: File, val needsRecovery: Boolean, val maxIndexSize: Int = (10*1024*1024), val indexIntervalBytes: Int = 4096, - time: Time = SystemTime, - brokerId: Int = 0) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Kafka Log on Broker " + brokerId + "], " + time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -122,10 +65,10 @@ private[kafka] class Log(val dir: File, private val lastflushedTime = new AtomicLong(System.currentTimeMillis) /* the actual segments of the log */ - private[log] val segments: SegmentList[LogSegment] = loadSegments() + private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments() /* Calculate the offset of the next message */ - private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) + private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) newGauge(name + "-" + "NumLogSegments", new Gauge[Int] { def getValue = numberOfSegments }) @@ -137,9 +80,9 @@ private[kafka] class Log(val dir: File, def name = dir.getName() /* Load the log segments from the log files on disk */ - private def loadSegments(): SegmentList[LogSegment] = { + private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = { // open all the segments read-only - val logSegments = new ArrayList[LogSegment] + val logSegments = new ConcurrentSkipListMap[Long, LogSegment] val ls = dir.listFiles() if(ls != null) { for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { @@ -147,75 +90,46 @@ private[kafka] class Log(val dir: File, throw new IOException("Could not read file " + file) val filename = file.getName() val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - // TODO: we should ideally rebuild any missing index files, instead of erroring out - if(!Log.indexFilename(dir, start).exists) - throw new IllegalStateException("Found log file with no corresponding index file.") - logSegments.add(new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + val hasIndex = Log.indexFilename(dir, start).exists + val segment = new LogSegment(dir = dir, + startOffset = start, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize) + if(!hasIndex) { + // this can only happen if someone manually deletes the index file + error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.messageSet.file.getAbsolutePath)) + segment.recover(maxMessageSize) + } + logSegments.put(start, segment) } } if(logSegments.size == 0) { - // no existing segments, create a new mutable segment - logSegments.add(new LogSegment(dir = dir, + // no existing segments, create a new mutable segment beginning at offset 0 + logSegments.put(0, + new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize)) } else { - // there is at least one existing segment, validate and recover them/it - // sort segments into ascending order for fast searching - Collections.sort(logSegments, new Comparator[LogSegment] { - def compare(s1: LogSegment, s2: LogSegment): Int = { - if(s1.start == s2.start) 0 - else if(s1.start < s2.start) -1 - else 1 - } - }) - - // reset the index size of the last (current active) log segment to its maximum value - logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize) - - // run recovery on the last segment if necessary - if(needsRecovery) - recoverSegment(logSegments.get(logSegments.size - 1)) - } - new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) - } - - /** - * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. - */ - private def recoverSegment(segment: LogSegment) { - segment.index.truncate() - var validBytes = 0 - var lastIndexEntry = 0 - val iter = segment.messageSet.iterator - try { - while(iter.hasNext) { - val entry = iter.next - entry.message.ensureValid() - if(validBytes - lastIndexEntry > indexIntervalBytes) { - segment.index.append(entry.offset, validBytes) - lastIndexEntry = validBytes - } - validBytes += MessageSet.entrySize(entry.message) + // reset the index size of the currently active log segment to allow more entries + val active = logSegments.lastEntry.getValue + active.index.resize(maxIndexSize) + + // run recovery on the active segment if necessary + if(needsRecovery) { + info("Recovering active segment of %s.".format(name)) + active.recover(maxMessageSize) } - } catch { - case e: InvalidMessageException => - logger.warn("Found invalid messages in log " + name) } - val truncated = segment.messageSet.sizeInBytes - validBytes - if(truncated > 0) - warn("Truncated " + truncated + " invalid bytes from the log " + name + ".") - segment.messageSet.truncateTo(validBytes) + logSegments } /** - * The number of segments in the log + * The number of segments in the log. + * Take care! this is an O(n) operation. */ - def numberOfSegments: Int = segments.view.length + def numberOfSegments: Int = segments.size /** * Close this log @@ -223,7 +137,7 @@ private[kafka] class Log(val dir: File, def close() { debug("Closing log " + name) lock synchronized { - for(seg <- segments.view) + for(seg <- logSegments) seg.close() } } @@ -234,64 +148,52 @@ private[kafka] class Log(val dir: File, * This method will generally be responsible for assigning offsets to the messages, * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. * - * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, - * or (-1,-1) if the message set is empty + * Returns information about the appended messages including the first and last offset */ - def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = { - val messageSetInfo = analyzeAndValidateMessageSet(messages) + def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { + val appendInfo = analyzeAndValidateMessageSet(messages) // if we have any valid messages, append them to the log - if(messageSetInfo.count == 0) { - (-1L, -1L) - } else { - BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count) - BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count) - - // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) + if(appendInfo.count == 0) + return appendInfo + + // trim any invalid bytes or partial messages before appending it to the on-disk log + var validMessages = trimInvalidBytes(messages) - try { - // they are valid, insert them in the log - val offsets = lock synchronized { - // maybe roll the log if this segment is full - val segment = maybeRoll(segments.view.last) + try { + // they are valid, insert them in the log + lock synchronized { + // maybe roll the log if this segment is full + val segment = maybeRoll(activeSegment) - // assign offsets to the messageset - val offsets = - if(assignOffsets) { - val firstOffset = nextOffset.get - validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec) - val lastOffset = nextOffset.get - 1 - (firstOffset, lastOffset) - } else { - if(!messageSetInfo.offsetsMonotonic) - throw new IllegalArgumentException("Out of order offsets found in " + messages) - nextOffset.set(messageSetInfo.lastOffset + 1) - (messageSetInfo.firstOffset, messageSetInfo.lastOffset) - } + if(assignOffsets) { + // assign offsets to the messageset + appendInfo.firstOffset = nextOffset.get + validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec) + appendInfo.lastOffset = nextOffset.get - 1 + } else { + // we are taking the offsets we are given + if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get) + throw new IllegalArgumentException("Out of order offsets found in " + messages) + nextOffset.set(appendInfo.lastOffset + 1) + } - // now append to the log - trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" - .format(this.name, offsets._1, nextOffset.get(), validMessages)) - segment.append(offsets._1, validMessages) + // now append to the log + trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset)) + segment.append(appendInfo.firstOffset, validMessages) - // return the offset at which the messages were appended - offsets - } - // maybe flush the log and index - maybeFlush(messageSetInfo.count) + maybeFlush(appendInfo.count) - // return the first and last offset - offsets - } catch { - case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) + appendInfo } + } catch { + case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } - /* struct to hold various quantities we compute about each message set before appending to the log */ - case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean) + /** struct to hold various quantities we compute about each message set before appending to the log */ + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean) /** * Validate the following: @@ -305,7 +207,7 @@ private[kafka] class Log(val dir: File, * 4. Whether the offsets are monotonically increasing * 5. Whether any compression codec is used (if many are used, then the last one is given) */ - private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = { + private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { var messageCount = 0 var firstOffset, lastOffset = -1L var codec: CompressionCodec = NoCompressionCodec @@ -332,7 +234,7 @@ private[kafka] class Log(val dir: File, if(messageCodec != NoCompressionCodec) codec = messageCodec } - MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) } /** @@ -353,114 +255,120 @@ private[kafka] class Log(val dir: File, } /** - * Read a message set from the log. - * startOffset - The logical offset to begin reading at + * Read messages from the log + * startOffset - The offset to begin reading at * maxLength - The maximum number of bytes to read - * maxOffset - The maximum logical offset to include in the resulting message set + * maxOffset - The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set). + * + * An attempt to read beyond the log end offset or before the base offset of the first segment will throw an OffsetOutOfRangeException. */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) - val view = segments.view - + // check if the offset is valid and in range - val first = view.head.start val next = nextOffset.get if(startOffset == next) return MessageSet.Empty - else if(startOffset > next || startOffset < first) - throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next)) - // Do the read on the segment with a base offset less than the target offset - // TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything - Log.findRange(view, startOffset, view.length) match { - case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset") - case Some(segment) => segment.read(startOffset, maxLength, maxOffset) + var entry = segments.floorEntry(startOffset) + + // attempt to read beyond the log end offset is an error + if(startOffset > next || entry == null) + throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next)) + + // do the read on the segment with a base offset less than the target offset + // but if that segment doesn't contain any messages with an offset greater than that + // continue to read from successive segments until we get some messages or we reach the end of the log + while(entry != null) { + val messages = entry.getValue.read(startOffset, maxLength, maxOffset) + if(messages == null) + entry = segments.higherEntry(entry.getKey) + else + return messages } + + // okay we are beyond the end of the last segment but less than the log end offset + MessageSet.Empty } /** - * Delete any log segments matching the given predicate function + * Delete any log segments matching the given predicate function, + * starting with the oldest segment and moving forward until a segment doesn't match. + * return the number of segments deleted */ - def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = { - lock synchronized { - val view = segments.view - val deletable = view.takeWhile(predicate) - for(seg <- deletable) - seg.deleted = true - var numToDelete = deletable.size - // if we are deleting everything, create a new empty segment - if(numToDelete == view.size) { - if (view(numToDelete - 1).size > 0) + def deleteOldSegments(predicate: LogSegment => Boolean): Int = { + // find any segments that match the user-supplied predicate UNLESS it is the final segment + // and it is empty (since we would just end up re-creating it + var deletable = logSegments.takeWhile(s => predicate(s) && s.baseOffset != logEndOffset) + val numToDelete = deletable.size + if(numToDelete > 0) { + lock synchronized { + // remove the segments for lookups + deletable.foreach(d => segments.remove(d.baseOffset)) + if(segments.size == 0) + // we must always have at least one segment, so if we deleted all the segments, create a new one roll() - else { - // If the last segment to be deleted is empty and we roll the log, the new segment will have the same - // file name. So simply reuse the last segment and reset the modified time. - view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds) - numToDelete -=1 - } } - segments.trunc(numToDelete) + // do not lock around actual file deletion, it isn't O(1) on many filesystems + deletable.foreach(_.delete()) } + numToDelete } /** * Get the size of the log in bytes */ - def size: Long = segments.view.foldLeft(0L)(_ + _.size) + def size: Long = logSegments.map(_.size).sum /** - * Get the offset of the next message that will be appended + * Get the offset of the next message that will be appended to the log */ def logEndOffset: Long = nextOffset.get /** - * Roll the log over if necessary + * Roll the log over to a new empty log segment if necessary */ private def maybeRoll(segment: LogSegment): LogSegment = { - if ((segment.messageSet.sizeInBytes > maxLogFileSize) || - ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) || + if ((segment.size > maxLogFileSize) || + (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) || segment.index.isFull) roll() else segment } - + /** - * Create a new segment and make it active, and return it + * Roll the log over to a new active segment starting with the current logEndOffset. + * This will trim the index to the exact size of the number of entries it currently contains. */ def roll(): LogSegment = { lock synchronized { - flush() - rollToOffset(logEndOffset) - } - } + // flush the log to ensure that only the active segment needs to be recovered + if(!segments.isEmpty()) + flush() - /** - * Roll the log over to the given new offset value - */ - private def rollToOffset(newOffset: Long): LogSegment = { - val logFile = logFilename(dir, newOffset) - val indexFile = indexFilename(dir, newOffset) - for(file <- List(logFile, indexFile); if file.exists) { - warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") - file.delete() - } - debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) - segments.view.lastOption match { - case Some(segment) => segment.index.trimToValidSize() - case None => + val newOffset = logEndOffset + val logFile = logFilename(dir, newOffset) + val indexFile = indexFilename(dir, newOffset) + for(file <- List(logFile, indexFile); if file.exists) { + warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") + file.delete() + } + + debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) + segments.lastEntry() match { + case null => + case entry => entry.getValue.index.trimToValidSize() + } + val segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize) + val prev = segments.put(segment.baseOffset, segment) + if(prev != null) + throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset)) + segment } - - val segmentsView = segments.view - if(segmentsView.size > 0 && segmentsView.last.start == newOffset) - throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset)) - - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize) - segments.append(segment) - segment } /** @@ -479,137 +387,113 @@ private[kafka] class Log(val dir: File, return lock synchronized { - debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + + debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " + time.milliseconds) - segments.view.last.flush() + activeSegment.flush() unflushed.set(0) lastflushedTime.set(time.milliseconds) } } - def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - val segsArray = segments.view - var offsetTimeArray: Array[(Long, Long)] = null - if(segsArray.last.size > 0) - offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) - else - offsetTimeArray = new Array[(Long, Long)](segsArray.length) - - for(i <- 0 until segsArray.length) - offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) - if(segsArray.last.size > 0) - offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) - - var startIndex = -1 - timestamp match { - case OffsetRequest.LatestTime => - startIndex = offsetTimeArray.length - 1 - case OffsetRequest.EarliestTime => - startIndex = 0 - case _ => - var isFound = false - debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) - startIndex = offsetTimeArray.length - 1 - while (startIndex >= 0 && !isFound) { - if (offsetTimeArray(startIndex)._2 <= timestamp) - isFound = true - else - startIndex -=1 - } - } - - val retSize = maxNumOffsets.min(startIndex + 1) - val ret = new Array[Long](retSize) - for(j <- 0 until retSize) { - ret(j) = offsetTimeArray(startIndex)._1 - startIndex -= 1 - } - // ensure that the returned seq is in descending order of offsets - ret.toSeq.sortBy(- _) - } - + /** + * Delete this log from the filesystem entirely + */ def delete(): Unit = { - deleteSegments(segments.contents.get()) + logSegments.foreach(_.delete()) Utils.rm(dir) } - - - /* Attempts to delete all provided segments from a log and returns how many it was able to */ - def deleteSegments(segments: Seq[LogSegment]): Int = { - var total = 0 - for(segment <- segments) { - info("Deleting log segment " + segment.start + " from " + name) - val deletedLog = segment.messageSet.delete() - val deletedIndex = segment.index.delete() - if(!deletedIndex || !deletedLog) { - throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.") - } else { - total += 1 - } - } - total - } + /** + * Truncate this log so that it ends with the greatest offset < targetOffset. + */ def truncateTo(targetOffset: Long) { + info("Truncating log %s to offset %d.".format(name, targetOffset)) if(targetOffset < 0) throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) + if(targetOffset > logEndOffset) { + info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1)) + return + } lock synchronized { - val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset) - val viewSize = segments.view.size - val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted) - /* We should not hit this error because segments.view is locked in markedDeletedWhile() */ - if(numSegmentsDeleted != segmentsToBeDeleted.size) - error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")") - if (numSegmentsDeleted == viewSize) { - segments.trunc(segments.view.size) - rollToOffset(targetOffset) - this.nextOffset.set(targetOffset) + if(segments.firstEntry.getValue.baseOffset > targetOffset) { + truncateFullyAndStartAt(targetOffset) } else { - if(targetOffset > logEndOffset) { - error("Target offset %d cannot be greater than the last message offset %d in the log %s". - format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath)) - } else { - // find the log segment that has this hw - val segmentToBeTruncated = findRange(segments.view, targetOffset) - segmentToBeTruncated match { - case Some(segment) => - val truncatedSegmentIndex = segments.view.indexOf(segment) - segments.truncLast(truncatedSegmentIndex) - segment.truncateTo(targetOffset) - this.nextOffset.set(targetOffset) - info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset)) - case None => // nothing to do - } - } + val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) + deletable.foreach(s => segments.remove(s.baseOffset)) + deletable.foreach(_.delete()) + activeSegment.truncateTo(targetOffset) + this.nextOffset.set(targetOffset) } } } - /** - * Truncate all segments in the log and start a new segment on a new offset + /** + * Delete all data in the log and start at the new offset */ - def truncateAndStartWithNewOffset(newOffset: Long) { + def truncateFullyAndStartAt(newOffset: Long) { + debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { - val deletedSegments = segments.trunc(segments.view.size) - debug("Truncate and start log '" + name + "' to " + newOffset) - segments.append(new LogSegment(dir, - newOffset, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) - deleteSegments(deletedSegments) + val segmentsToDelete = logSegments.toList + segments.clear() + segmentsToDelete.foreach(_.delete()) + segments.put(newOffset, + new LogSegment(dir, + newOffset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize)) this.nextOffset.set(newOffset) } } - def topicName():String = { - name.substring(0, name.lastIndexOf("-")) - } + /** + * The time this log is last known to have been fully flushed to disk + */ + def lastFlushTime(): Long = lastflushedTime.get + + /** + * Get the active segment that is currently taking appends + */ + def activeSegment = segments.lastEntry.getValue + + /** + * Get all the log segments in this log ordered from oldes to newest + */ + def logSegments: Iterable[LogSegment] = asIterable(segments.values) + + override def toString() = "Log(" + this.dir + ")" + +} - def getLastFlushedTime():Long = { - return lastflushedTime.get +/** + * Helper functions for logs + */ +object Log { + val LogFileSuffix = ".log" + val IndexFileSuffix = ".index" + + /** + * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros + * so that ls sorts the files numerically + */ + def filenamePrefixFromOffset(offset: Long): String = { + val nf = NumberFormat.getInstance() + nf.setMinimumIntegerDigits(20) + nf.setMaximumFractionDigits(0) + nf.setGroupingUsed(false) + nf.format(offset) } - override def toString() = "Log(" + this.dir + ")" + /** + * Construct a log file name in the given dir with the given base offset + */ + def logFilename(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) + + /** + * Construct an index file name in the given dir using the given base offset + */ + def indexFilename(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1d4f885..1820148 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -120,8 +120,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, - time, - config.brokerId) + time) val previous = this.logs.put(topicPartition, log) if(previous != null) throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) @@ -132,7 +131,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Start the log flush thread + * Start the background threads to flush logs and do log cleanup */ def startup() { /* Schedule the cleanup task to delete old logs */ @@ -147,7 +146,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Get the log if it exists + * Get the log if it exists, otherwise return None */ def getLog(topic: String, partition: Int): Option[Log] = { val topicAndPartiton = TopicAndPartition(topic, partition) @@ -159,7 +158,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Create the log if it does not exist, if it exists just return it + * Create the log if it does not exist, otherwise just return it */ def getOrCreateLog(topic: String, partition: Int): Log = { val topicAndPartition = TopicAndPartition(topic, partition) @@ -195,8 +194,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, - time, - config.brokerId) + time) info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) log @@ -223,14 +221,6 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - val log = getLog(topicAndPartition.topic, topicAndPartition.partition) - log match { - case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets) - case None => getEmptyOffsets(timestamp) - } - } - /** * Runs through the log removing segments older than a certain age */ @@ -238,9 +228,7 @@ private[kafka] class LogManager(val config: KafkaConfig, val startMs = time.milliseconds val topic = parseTopicPartitionName(log.name).topic val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) - val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs) - val total = log.deleteSegments(toBeDeleted) - total + log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs) } /** @@ -250,7 +238,8 @@ private[kafka] class LogManager(val config: KafkaConfig, private def cleanupSegmentsToMaintainSize(log: Log): Int = { val topic = parseTopicPartitionName(log.dir.getName).topic val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize) - if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0 + if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) + return 0 var diff = log.size - maxLogRetentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { @@ -260,9 +249,7 @@ private[kafka] class LogManager(val config: KafkaConfig, false } } - val toBeDeleted = log.markDeletedWhile( shouldDelete ) - val total = log.deleteSegments(toBeDeleted) - total + log.deleteOldSegments(shouldDelete) } /** @@ -307,19 +294,19 @@ private[kafka] class LogManager(val config: KafkaConfig, */ private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") - for (log <- allLogs) { + for ((topicAndPartition, log) <- logs) { try { - val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime + val timeSinceLastFlush = System.currentTimeMillis - log.lastFlushTime var logFlushInterval = config.defaultFlushIntervalMs - if(logFlushIntervals.contains(log.topicName)) - logFlushInterval = logFlushIntervals(log.topicName) - debug(log.topicName + " flush interval " + logFlushInterval + - " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush) + if(logFlushIntervals.contains(topicAndPartition.topic)) + logFlushInterval = logFlushIntervals(topicAndPartition.topic) + debug(topicAndPartition.topic + " flush interval " + logFlushInterval + + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) if(timeSinceLastFlush >= logFlushInterval) log.flush } catch { case e => - error("Error flushing topic " + log.topicName, e) + error("Error flushing topic " + topicAndPartition.topic, e) e match { case _: IOException => fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e) @@ -330,11 +317,12 @@ private[kafka] class LogManager(val config: KafkaConfig, } } + /** + * Parse the topic and partition out of the directory name of a log + */ private def parseTopicPartitionName(name: String): TopicAndPartition = { val index = name.lastIndexOf('-') TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) } - def topics(): Iterable[String] = logs.keys.map(_.topic) - } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4417cff..a87bdb5 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -3,6 +3,7 @@ package kafka.log import scala.math._ import java.io.File import kafka.message._ +import kafka.common._ import kafka.utils._ /** @@ -16,20 +17,14 @@ import kafka.utils._ @nonthreadsafe class LogSegment(val messageSet: FileMessageSet, val index: OffsetIndex, - val start: Long, + val baseOffset: Long, val indexIntervalBytes: Int, - time: Time) extends Range with Logging { + time: Time) extends Logging { - var firstAppendTime: Option[Long] = - if (messageSet.sizeInBytes > 0) - Some(time.milliseconds) - else - None + var created = time.milliseconds /* the number of bytes since we last added an entry in the offset index */ - var bytesSinceLastIndexEntry = 0 - - @volatile var deleted = false + private var bytesSinceLastIndexEntry = 0 def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), @@ -40,18 +35,14 @@ class LogSegment(val messageSet: FileMessageSet, /* Return the size in bytes of this log segment */ def size: Long = messageSet.sizeInBytes() - - def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - + /** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * * It is assumed this method is being called from within a lock */ + @nonthreadsafe def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes())) @@ -62,7 +53,6 @@ class LogSegment(val messageSet: FileMessageSet, } // append the messages messageSet.append(messages) - updateFirstAppendTime() this.bytesSinceLastIndexEntry += messages.sizeInBytes } } @@ -70,17 +60,24 @@ class LogSegment(val messageSet: FileMessageSet, /** * Find the physical file position for the least offset >= the given offset. If no offset is found * that meets this criteria before the end of the log, return null. + * + * The lowerBound argument is an optimization that can be used if we already know a valid starting position + * in the file higher than the greast-lower-bound from the index. */ - private def translateOffset(offset: Long): OffsetPosition = { + @threadsafe + private def translateOffset(offset: Long, lowerBound: Int = 0): OffsetPosition = { val mapping = index.lookup(offset) - messageSet.searchFor(offset, mapping.position) + messageSet.searchFor(offset, max(mapping.position, lowerBound)) } /** * Read a message set from this segment beginning with the first offset * greater than or equal to the startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. + * + * If the startOffset is larger than the largest offset in this log return null. */ + @threadsafe def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) @@ -89,9 +86,9 @@ class LogSegment(val messageSet: FileMessageSet, val startPosition = translateOffset(startOffset) - // if the start position is already off the end of the log, return MessageSet.Empty + // if the start position is already off the end of the log, return null if(startPosition == null) - return MessageSet.Empty + return null // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = @@ -103,7 +100,7 @@ class LogSegment(val messageSet: FileMessageSet, // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) - val mapping = translateOffset(offset) + val mapping = translateOffset(offset, startPosition.position) val endPosition = if(mapping == null) messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file @@ -114,12 +111,45 @@ class LogSegment(val messageSet: FileMessageSet, } messageSet.read(startPosition.position, length) } + + /** + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. + * The maxMessageSize parameter will bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this + * is corrupt. + */ + @nonthreadsafe + def recover(maxMessageSize: Int) { + index.truncate() + var validBytes = 0 + var lastIndexEntry = 0 + val iter = messageSet.iterator(maxMessageSize) + try { + while(iter.hasNext) { + val entry = iter.next + entry.message.ensureValid() + if(validBytes - lastIndexEntry > indexIntervalBytes) { + index.append(entry.offset, validBytes) + lastIndexEntry = validBytes + } + validBytes += MessageSet.entrySize(entry.message) + } + } catch { + case e: InvalidMessageException => + logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(messageSet.file.getAbsolutePath, validBytes, e.getMessage)) + } + val truncated = messageSet.sizeInBytes - validBytes + if(truncated > 0) + warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(messageSet.file.getAbsolutePath)) + messageSet.truncateTo(validBytes) + } - override def toString() = "LogSegment(start=" + start + ", size=" + size + ")" + override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" /** - * Truncate off all index and log entries with offsets greater than or equal to the current offset. + * Truncate off all index and log entries with offsets greater than or equal to the current offset. + * If the given offset is larger than the largest message in this segment, do nothing. */ + @nonthreadsafe def truncateTo(offset: Long) { val mapping = translateOffset(offset) if(mapping == null) @@ -129,17 +159,18 @@ class LogSegment(val messageSet: FileMessageSet, index.resize(index.maxIndexSize) messageSet.truncateTo(mapping.position) if (messageSet.sizeInBytes == 0) - firstAppendTime = None + created = time.milliseconds } /** * Calculate the offset that would be used for the next message to be append to this segment. - * Not that this is expensive. + * Note that this is expensive. */ + @threadsafe def nextOffset(): Long = { val ms = read(index.lastOffset, messageSet.sizeInBytes, None) ms.lastOption match { - case None => start + case None => baseOffset case Some(last) => last.nextOffset } } @@ -147,6 +178,7 @@ class LogSegment(val messageSet: FileMessageSet, /** * Flush this log segment to disk */ + @threadsafe def flush() { messageSet.flush() index.flush() @@ -160,4 +192,24 @@ class LogSegment(val messageSet: FileMessageSet, Utils.swallow(messageSet.close) } + def delete() { + val deletedLog = messageSet.delete() + val deletedIndex = index.delete() + if(!deletedLog) + throw new KafkaStorageException("Delete of log " + messageSet.file.getName + " failed.") + if(!deletedIndex) + throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + } + + /** + * The last modified time of this log segment as a unix time stamp + */ + def lastModified = messageSet.file.lastModified + + /** + * Update the last modified time of the segment + */ + def touch() { + messageSet.file.setLastModified(time.milliseconds) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 23adca1..4927a2b 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -51,7 +51,7 @@ import kafka.utils._ */ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { - /* the memory mapping */ + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() @@ -84,10 +84,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } } - /* the maximum number of entries this index can hold */ + /* the maximum number of eight-byte entries this index can hold */ def maxEntries = mmap.limit / 8 - /* the number of entries in the index */ + /* the number of eight-byte entries currently in the index */ private var size = new AtomicInteger(mmap.position / 8) /* the last offset in the index */ diff --git a/core/src/main/scala/kafka/log/SegmentList.scala b/core/src/main/scala/kafka/log/SegmentList.scala deleted file mode 100644 index 5c7b0bd..0000000 --- a/core/src/main/scala/kafka/log/SegmentList.scala +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.util.concurrent.atomic._ -import reflect._ -import scala.math._ -import kafka.common.KafkaException - -private[log] object SegmentList { - val MaxAttempts: Int = 20 -} - -/** - * A copy-on-write list implementation that provides consistent views. The view() method - * provides an immutable sequence representing a consistent state of the list. The user can do - * iterative operations on this sequence such as binary search without locking all access to the list. - * Even if the range of the underlying list changes no change will be made to the view - */ -private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) { - - val contents: AtomicReference[Array[T]] = new AtomicReference(seq.toArray) - - /** - * Append the given items to the end of the list - */ - def append(ts: T*)(implicit m: ClassManifest[T]) { - val curr = contents.get() - val updated = new Array[T](curr.length + ts.length) - Array.copy(curr, 0, updated, 0, curr.length) - for(i <- 0 until ts.length) - updated(curr.length + i) = ts(i) - contents.set(updated) - } - - - /** - * Delete the first n items from the list - */ - def trunc(newStart: Int): Seq[T] = { - if(newStart < 0) - throw new KafkaException("Starting index must be positive."); - var deleted: Array[T] = null - val curr = contents.get() - if (curr.length > 0) { - val newLength = max(curr.length - newStart, 0) - val updated = new Array[T](newLength) - Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength) - contents.set(updated) - deleted = new Array[T](newStart) - Array.copy(curr, 0, deleted, 0, curr.length - newLength) - } - deleted - } - - /** - * Delete the items from position (newEnd + 1) until end of list - */ - def truncLast(newEnd: Int): Seq[T] = { - if (newEnd < 0 || newEnd >= contents.get().length) - throw new KafkaException("Attempt to truncate segment list of length %d to %d.".format(contents.get().size, newEnd)); - var deleted: Array[T] = null - val curr = contents.get() - if (curr.length > 0) { - val newLength = newEnd + 1 - val updated = new Array[T](newLength) - Array.copy(curr, 0, updated, 0, newLength) - contents.set(updated) - deleted = new Array[T](curr.length - newLength) - Array.copy(curr, min(newEnd + 1, curr.length - 1), deleted, 0, curr.length - newLength) - } - deleted - } - - /** - * Get a consistent view of the sequence - */ - def view: Array[T] = contents.get() - - /** - * Nicer toString method - */ - override def toString(): String = "SegmentList(%s)".format(view.mkString(", ")) - -} diff --git a/core/src/main/scala/kafka/log/package.html b/core/src/main/scala/kafka/log/package.html index 0880be7..d8521a1 100644 --- a/core/src/main/scala/kafka/log/package.html +++ b/core/src/main/scala/kafka/log/package.html @@ -1 +1,6 @@ -The log management system for Kafka. \ No newline at end of file +The log management system for Kafka. + +The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the +flush policy and retention policies. + +The Log itself is made up of log segments. A log is a FileMessageSet that contains the data and an OffsetIndex that supports reads by offset on the log. \ No newline at end of file diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index 7ef92ab..a1b5c63 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -92,15 +92,23 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { } /** - * Print this message set's contents + * Print this message set's contents. If the message set has more than 100 messages, just + * print the first 100. */ override def toString: String = { val builder = new StringBuilder() builder.append(getClass.getSimpleName + "(") - for(message <- this) { + val iter = this.iterator + var i = 0 + while(iter.hasNext && i < 100) { + val message = iter.next builder.append(message) - builder.append(", ") + if(iter.hasNext) + builder.append(", ") + i += 1 } + if(iter.hasNext) + builder.append("...") builder.append(")") builder.toString } diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 2c9f2d1..6d926d2 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -75,14 +75,14 @@ class ProducerConfig private (val props: VerifiableProperties) val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) /** - * The producer using the zookeeper software load balancer maintains a ZK cache that gets - * updated by the zookeeper watcher listeners. During some events like a broker bounce, the - * producer ZK cache can get into an inconsistent state, for a small time period. In this time - * period, it could end up picking a broker partition that is unavailable. When this happens, the - * ZK cache needs to be updated. - * This parameter specifies the number of times the producer attempts to refresh this ZK cache. + * If a request fails it is possible to have the producer automatically retry. This is controlled by this setting. + * Note that not all errors mean that the message was lost--for example if the network connection is lost we will + * get a socket exception--in this case enabling retries can result in duplicate messages. */ - val producerRetries = props.getInt("producer.num.retries", 3) + val producerRetries = props.getInt("producer.num.retries", 1) + /** + * The amount of time to wait in between retries + */ val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a14e0a2..eff627c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,6 +21,7 @@ import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.message._ import kafka.network._ +import kafka.log._ import kafka.utils.{Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ @@ -59,7 +60,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) - case requestId => throw new KafkaException("No mapping found for handler id " + requestId) + case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => @@ -243,12 +244,17 @@ class KafkaApis(val requestChannel: RequestChannel, try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val log = localReplica.log.get - val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) + val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) + + // update stats + BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count) + BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count) + // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end)) - ProduceResult(topicAndPartition, start, end) + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) } catch { case e: KafkaStorageException => fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) @@ -358,11 +364,11 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = if (fromReplicaId == Request.OrdinaryConsumerId) { - Some(localReplica.highWatermark) - } else { - None - } + val maxOffsetOpt = + if (fromReplicaId == Request.OrdinaryConsumerId) + Some(localReplica.highWatermark) + else + None val messages = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt) @@ -391,15 +397,18 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition) val offsets = { - val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition, - partitionOffsetRequestInfo.time, - partitionOffsetRequestInfo.maxNumOffsets) - if (!offsetRequest.isFromOrdinaryClient) allOffsets - else { + val allOffsets = fetchOffsets(replicaManager.logManager, + topicAndPartition, + partitionOffsetRequestInfo.time, + partitionOffsetRequestInfo.maxNumOffsets) + if (!offsetRequest.isFromOrdinaryClient) { + allOffsets + } else { val hw = localReplica.highWatermark if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else allOffsets + else + allOffsets } } (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) @@ -412,6 +421,59 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(log) => + fetchOffsetsBefore(log, timestamp, maxNumOffsets) + case None => + if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) + Seq(0L) + else + Nil + } + } + + def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + val segsArray = log.logSegments.toArray + var offsetTimeArray: Array[(Long, Long)] = null + if(segsArray.last.size > 0) + offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) + else + offsetTimeArray = new Array[(Long, Long)](segsArray.length) + + for(i <- 0 until segsArray.length) + offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) + if(segsArray.last.size > 0) + offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) + + var startIndex = -1 + timestamp match { + case OffsetRequest.LatestTime => + startIndex = offsetTimeArray.length - 1 + case OffsetRequest.EarliestTime => + startIndex = 0 + case _ => + var isFound = false + debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) + startIndex = offsetTimeArray.length - 1 + while (startIndex >= 0 && !isFound) { + if (offsetTimeArray(startIndex)._2 <= timestamp) + isFound = true + else + startIndex -=1 + } + } + + val retSize = maxNumOffsets.min(startIndex + 1) + val ret = new Array[Long](retSize) + for(j <- 0 until retSize) { + ret(j) = offsetTimeArray(startIndex)._1 + startIndex -= 1 + } + // ensure that the returned seq is in descending order of offsets + ret.toSeq.sortBy(- _) + } /** * Service the topic metadata request API diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e0a86b9..27bd288 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var logManager: LogManager = null var kafkaZookeeper: KafkaZooKeeper = null var replicaManager: ReplicaManager = null - private var apis: KafkaApis = null + var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(4) var zkClient: ZkClient = null diff --git a/core/src/main/scala/kafka/server/LeaderElector.scala b/core/src/main/scala/kafka/server/LeaderElector.scala index a0b79e4..14b3fa4 100644 --- a/core/src/main/scala/kafka/server/LeaderElector.scala +++ b/core/src/main/scala/kafka/server/LeaderElector.scala @@ -27,8 +27,6 @@ trait LeaderElector extends Logging { def amILeader : Boolean -// def electAndBecomeLeader: Unit -// def elect: Boolean def close diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 40afcab..8dc2b2a 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -66,7 +66,7 @@ class ReplicaFetcherThread(name:String, ) val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get - replica.log.get.truncateAndStartWithNewOffset(offset) + replica.log.get.truncateFullyAndStartAt(offset) offset } diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 36a119b..5a7a2df 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -54,7 +54,7 @@ import java.util.Properties; * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer, * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code. */ - +@SuppressWarnings({"unchecked", "rawtypes"}) public class KafkaMigrationTool { private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); diff --git a/core/src/main/scala/kafka/utils/Range.scala b/core/src/main/scala/kafka/utils/Range.scala deleted file mode 100644 index ca7d699..0000000 --- a/core/src/main/scala/kafka/utils/Range.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - - -/** - * A generic range value with a start and end - */ -trait Range { - /** The first index in the range */ - def start: Long - /** The total number of indexes in the range */ - def size: Long - /** Return true iff the range is empty */ - def isEmpty: Boolean = size == 0 - - /** if value is in range */ - def contains(value: Long): Boolean = { - if( (size == 0 && value == start) || - (size > 0 && value >= start && value <= start + size - 1) ) - return true - else - return false - } - - override def toString() = "(start=" + start + ", size=" + size + ")" -} diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index 2bfa102..9e53b03 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -17,27 +17,24 @@ package kafka.utils; +import java.util.Random import scala.math._ -object Throttler extends Logging { - val DefaultCheckIntervalMs = 100L -} - /** * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for - * an appropraite amount of time when maybeThrottle() is called to attain the desired rate. + * an appropriate amount of time when maybeThrottle() is called to attain the desired rate. * * @param desiredRatePerSec: The rate we want to hit in units/sec * @param checkIntervalMs: The interval at which to check our rate * @param throttleDown: Does throttling increase or decrease our rate? * @param time: The time implementation to use */ -@nonthreadsafe +@threadsafe class Throttler(val desiredRatePerSec: Double, val checkIntervalMs: Long, val throttleDown: Boolean, - val time: Time) { + val time: Time) extends Logging { private val lock = new Object private var periodStartNs: Long = time.nanoseconds @@ -65,8 +62,7 @@ class Throttler(val desiredRatePerSec: Double, val elapsedMs = elapsedNs / Time.NsPerMs val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs) if(sleepTime > 0) { - Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + - ", sleeping for " + sleepTime + " ms to compensate.") + println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime)) time.sleep(sleepTime) } } @@ -77,3 +73,26 @@ class Throttler(val desiredRatePerSec: Double, } } + +object Throttler { + + val DefaultCheckIntervalMs = 100L + + def main(args: Array[String]) { + val rand = new Random() + val throttler = new Throttler(1000000, 100, true, SystemTime) + var start = System.currentTimeMillis + var total = 0 + while(true) { + val value = rand.nextInt(1000) + throttler.maybeThrottle(value) + total += value + val now = System.currentTimeMillis + if(now - start >= 1000) { + println(total) + start = now + total = 0 + } + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 17cabc7..7e17da4 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -401,8 +401,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) servers.foreach(_.shutdown()) - - } private def checkIfReassignPartitionPathExists(): Boolean = { diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index d0044cf..cec1cae 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -35,13 +35,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases { set } + /** + * Test that the cached size variable matches the actual file size as we append messages + */ @Test def testFileSize() { assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - messageSet.append(singleMessageSet("abcd".getBytes())) - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + for(i <- 0 until 20) { + messageSet.append(singleMessageSet("abcd".getBytes)) + assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + } } + /** + * Test that adding invalid bytes to the end of the log doesn't break iteration + */ @Test def testIterationOverPartialAndTruncation() { testPartialWrite(0, messageSet) @@ -62,6 +70,9 @@ class FileMessageSetTest extends BaseMessageSetTestCases { checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) } + /** + * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel. + */ @Test def testIterationDoesntChangePosition() { val position = messageSet.channel.position @@ -69,39 +80,71 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(position, messageSet.channel.position) } + /** + * Test a simple append and read. + */ @Test def testRead() { - val read = messageSet.read(0, messageSet.sizeInBytes) + var read = messageSet.read(0, messageSet.sizeInBytes) checkEquals(messageSet.iterator, read.iterator) val items = read.iterator.toList val sec = items.tail.head - val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes) - checkEquals(items.tail.iterator, read2.iterator) + read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes) + assertEquals("Try a read starting from the second message", items.tail, read.toList) + read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message)) + assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList) } + /** + * Test the MessageSet.searchFor API. + */ @Test def testSearch() { // append a new message with a high offset val lastMessage = new Message("test".getBytes) messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage)) - var physicalOffset = 0 + var position = 0 assertEquals("Should be able to find the first message by its offset", - OffsetPosition(0L, physicalOffset), + OffsetPosition(0L, position), messageSet.searchFor(0, 0)) - physicalOffset += MessageSet.entrySize(messageSet.head.message) + position += MessageSet.entrySize(messageSet.head.message) assertEquals("Should be able to find second message when starting from 0", - OffsetPosition(1L, physicalOffset), + OffsetPosition(1L, position), messageSet.searchFor(1, 0)) assertEquals("Should be able to find second message starting from its offset", - OffsetPosition(1L, physicalOffset), - messageSet.searchFor(1, physicalOffset)) - physicalOffset += MessageSet.entrySize(messageSet.tail.head.message) - assertEquals("Should be able to find third message from a non-existant offset", - OffsetPosition(50L, physicalOffset), - messageSet.searchFor(3, physicalOffset)) - assertEquals("Should be able to find third message by correct offset", - OffsetPosition(50L, physicalOffset), - messageSet.searchFor(50, physicalOffset)) + OffsetPosition(1L, position), + messageSet.searchFor(1, position)) + position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message) + assertEquals("Should be able to find fourth message from a non-existant offset", + OffsetPosition(50L, position), + messageSet.searchFor(3, position)) + assertEquals("Should be able to find fourth message by correct offset", + OffsetPosition(50L, position), + messageSet.searchFor(50, position)) + } + + /** + * Test that the message set iterator obeys start and end slicing + */ + @Test + def testIteratorWithLimits() { + val message = messageSet.toList(1) + val start = messageSet.searchFor(1, 0).position + val size = message.message.size + val slice = messageSet.read(start, size) + assertEquals(List(message), slice.toList) + } + + /** + * Test the truncateTo method lops off messages and appropriately updates the size + */ + @Test + def testTruncate() { + val message = messageSet.toList(0) + val end = messageSet.searchFor(1, 0).position + messageSet.truncateTo(end) + assertEquals(List(message), messageSet.toList) + assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b06d812..eeba961 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -59,6 +59,9 @@ class LogManagerTest extends JUnit3Suite { super.tearDown() } + /** + * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log. + */ @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) @@ -67,29 +70,34 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test that get on a non-existent returns None and no log is created. + */ @Test - def testGetLog() { + def testGetNonExistentLog() { val log = logManager.getLog(name, 0) + assertEquals("No log should be found.", None, log) val logFile = new File(config.logDirs(0), name + "-0") assertTrue(!logFile.exists) } + /** + * Test time-based log cleanup. First append messages, then set the time into the future and run cleanup. + */ @Test def testCleanupExpiredSegments() { val log = logManager.getOrCreateLog(name, 0) var offset = 0L for(i <- 0 until 1000) { var set = TestUtils.singleMessageSet("test".getBytes()) - val (start, end) = log.append(set) - offset = end + val info = log.append(set) + offset = info.lastOffset } - log.flush assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) // update the last modified time of all log segments - val logSegments = log.segments.view - logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs)) + log.logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs)) time.currentMs += maxLogAgeHours*60*60*1000 + 1 logManager.cleanupLogs() @@ -106,6 +114,9 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted. + */ @Test def testCleanupSegmentsToMaintainSize() { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes @@ -130,11 +141,9 @@ class LogManagerTest extends JUnit3Suite { // add a bunch of messages that should be larger than the retentionSize for(i <- 0 until 1000) { val set = TestUtils.singleMessageSet("test".getBytes()) - val (start, end) = log.append(set) - offset = start + val info = log.append(set) + offset = info.firstOffset } - // flush to make sure it's written to disk - log.flush // should be exactly 100 full segments + 1 new empty one assertEquals("There should be example 100 segments.", 100, log.numberOfSegments) @@ -153,15 +162,16 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test that flush is invoked by the background scheduler thread. + */ @Test def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { - override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 override val flushInterval = Int.MaxValue - override val logRollHours = maxRollInterval override val flushIntervalMap = Map("timebasedflush" -> 100) } logManager = new LogManager(config, scheduler, time) @@ -171,11 +181,14 @@ class LogManagerTest extends JUnit3Suite { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - val ellapsed = System.currentTimeMillis - log.getLastFlushedTime + val ellapsed = System.currentTimeMillis - log.lastFlushTime assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), ellapsed < 2*config.flushSchedulerThreadRate) } + /** + * Test that new logs that are created are assigned to the least loaded log directory + */ @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories @@ -196,6 +209,9 @@ class LogManagerTest extends JUnit3Suite { } } + /** + * Test that it is not possible to open two log managers using the same data directory + */ def testTwoLogManagersUsingSameDirFails() { try { new LogManager(logManager.config, scheduler, time) diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala deleted file mode 100644 index f3a272e..0000000 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.File -import kafka.utils._ -import kafka.server.{KafkaConfig, KafkaServer} -import junit.framework.Assert._ -import java.util.{Random, Properties} -import kafka.consumer.SimpleConsumer -import org.junit.{After, Before, Test} -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} -import kafka.zk.ZooKeeperTestHarness -import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand -import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.utils.TestUtils._ -import kafka.common.{ErrorMapping, TopicAndPartition} - -object LogOffsetTest { - val random = new Random() -} - -class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { - var logDir: File = null - var topicLogDir: File = null - var server: KafkaServer = null - var logSize: Int = 100 - val brokerPort: Int = 9099 - var simpleConsumer: SimpleConsumer = null - var time: Time = new MockTime() - - @Before - override def setUp() { - super.setUp() - val config: Properties = createBrokerConfig(1, brokerPort) - val logDirPath = config.getProperty("log.dir") - logDir = new File(logDirPath) - time = new MockTime() - server = TestUtils.createServer(new KafkaConfig(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) - } - - @After - override def tearDown() { - simpleConsumer.close - server.shutdown - Utils.rm(logDir) - super.tearDown() - } - - @Test - def testGetOffsetsForUnknownTopic() { - val topicAndPartition = TopicAndPartition("foo", 0) - val request = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10))) - val offsetResponse = simpleConsumer.getOffsetsBefore(request) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, - offsetResponse.partitionErrorAndOffsets(topicAndPartition).error) - } - - @Test - def testGetOffsetsBeforeLatestTime() { - val topicPartition = "kafka-" + 0 - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue - - // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") - - val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) - - val message = new Message(Integer.toString(42).getBytes()) - for(i <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) - log.flush() - - val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10) - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) - - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), - replicaId = 0) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) - - // try to fetch using latest offset - val fetchResponse = simpleConsumer.fetch( - new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build()) - assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext) - } - - @Test - def testEmptyLogsGetOffsets() { - val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10) - val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition - topicLogDir = new File(topicPartitionPath) - topicLogDir.mkdir - - val topic = topicPartition.split("-").head - - // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) - - var offsetChanged = false - for(i <- 1 to 14) { - val topicAndPartition = TopicAndPartition(topic, 0) - val offsetRequest = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - - if(consumerOffsets(0) == 1) { - offsetChanged = true - } - } - assertFalse(offsetChanged) - } - - @Test - def testGetOffsetsBeforeNow() { - val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(3) - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue - - // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") - - val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) - val message = new Message(Integer.toString(42).getBytes()) - for(i <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) - log.flush() - - time.sleep(20) - val now = time.milliseconds - - val offsets = log.getOffsetsBefore(now, 10) - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) - - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) - } - - @Test - def testGetOffsetsBeforeEarliestTime() { - val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(3) - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue - - // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") - - val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) - val message = new Message(Integer.toString(42).getBytes()) - for(i <- 0 until 20) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) - log.flush() - - val offsets = log.getOffsetsBefore(OffsetRequest.EarliestTime, 10) - - assertEquals(Seq(0L), offsets) - - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(0L), consumerOffsets) - } - - private def createBrokerConfig(nodeId: Int, port: Int): Properties = { - val props = new Properties - props.put("brokerid", nodeId.toString) - props.put("port", port.toString) - props.put("log.dir", getLogDir.getAbsolutePath) - props.put("log.flush.interval", "1") - props.put("enable.zookeeper", "false") - props.put("num.partitions", "20") - props.put("log.retention.hours", "10") - props.put("log.cleanup.interval.mins", "5") - props.put("log.file.size", logSize.toString) - props.put("zk.connect", zkConnect.toString) - props - } - - private def getLogDir(): File = { - val dir = TestUtils.tempDir() - dir - } - -} diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 86a30f3..3694408 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -2,6 +2,9 @@ package kafka.log import junit.framework.Assert._ import java.util.concurrent.atomic._ +import java.io.File +import java.io.RandomAccessFile +import java.util.Random import org.junit.{Test, After} import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils @@ -13,6 +16,7 @@ class LogSegmentTest extends JUnit3Suite { val segments = mutable.ArrayBuffer[LogSegment]() + /* create a segment with the given base offset */ def createSegment(offset: Long): LogSegment = { val msFile = TestUtils.tempFile() val ms = new FileMessageSet(msFile) @@ -24,6 +28,7 @@ class LogSegmentTest extends JUnit3Suite { seg } + /* create a ByteBufferMessageSet for the given messages starting from the given offset */ def messages(offset: Long, messages: String*): ByteBufferMessageSet = { new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, offsetCounter = new AtomicLong(offset), @@ -38,13 +43,20 @@ class LogSegmentTest extends JUnit3Suite { } } + /** + * A read on an empty log segment should return null + */ @Test def testReadOnEmptySegment() { val seg = createSegment(40) val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None) - assertEquals(0, read.size) + assertNull("Read beyond the last offset in the segment should be null", read) } + /** + * Reading from before the first offset in the segment should return messages + * beginning with the first message in the segment + */ @Test def testReadBeforeFirstOffset() { val seg = createSegment(40) @@ -54,24 +66,40 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(ms.toList, read.toList) } + /** + * If we set the startOffset and maxOffset for the read to be the same value + * we should get only the first message in the log + */ @Test - def testReadSingleMessage() { - val seg = createSegment(40) - val ms = messages(50, "hello", "there") - seg.append(50, ms) - val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50)) - assertEquals(new Message("hello".getBytes), read.head.message) + def testMaxOffset() { + val baseOffset = 50 + val seg = createSegment(baseOffset) + val ms = messages(baseOffset, "hello", "there", "beautiful") + seg.append(baseOffset, ms) + def validate(offset: Long) = + assertEquals(ms.filter(_.offset == offset).toList, + seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList) + validate(50) + validate(51) + validate(52) } + /** + * If we read from an offset beyond the last offset in the segment we should get null + */ @Test def testReadAfterLast() { val seg = createSegment(40) val ms = messages(50, "hello", "there") seg.append(50, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) - assertEquals(0, read.size) + assertNull("Read beyond the last offset in the segment should give null", null) } + /** + * If we read from an offset which doesn't exist we should get a message set beginning + * with the least offset greater than the given startOffset. + */ @Test def testReadFromGap() { val seg = createSegment(40) @@ -83,6 +111,10 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(ms2.toList, read.toList) } + /** + * In a loop append two messages then truncate off the second of those messages and check that we can read + * the first but not the second message. + */ @Test def testTruncate() { val seg = createSegment(40) @@ -104,15 +136,22 @@ class LogSegmentTest extends JUnit3Suite { } } + /** + * Test truncating the whole segment, and check that we can reappend with the original offset. + */ @Test def testTruncateFull() { // test the case where we fully truncate the log val seg = createSegment(40) seg.append(40, messages(40, "hello", "there")) seg.truncateTo(0) + assertNull("Segment should be empty.", seg.read(0, 1024, None)) seg.append(40, messages(40, "hello", "there")) } + /** + * Test that offsets are assigned sequentially and that the nextOffset variable is incremented + */ @Test def testNextOffsetCalculation() { val seg = createSegment(40) @@ -121,4 +160,49 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(53, seg.nextOffset()) } + /** + * Create a segment with some data and an index. Then corrupt the index, + * and recover the segment, the entries should all be readable. + */ + @Test + def testRecoveryFixesCorruptIndex() { + val seg = createSegment(0) + for(i <- 0 until 100) + seg.append(i, messages(i, i.toString)) + val indexFile = seg.index.file + writeNonsense(indexFile, 0, indexFile.length.toInt) + for(i <- 0 until 100) + assertEquals(i, seg.read(i, 1024, Some(i+1)).head.offset) + } + + /** + * Randomly corrupt a log a number of times and attempt recovery. + */ + @Test + def testRecoveryWithCorruptMessage() { + val rand = new Random(1) + val messagesAppended = 20 + for(iteration <- 0 until 10) { + val seg = createSegment(0) + for(i <- 0 until messagesAppended) + seg.append(i, messages(i, i.toString)) + val offsetToBeginCorruption = rand.nextInt(messagesAppended) + // start corrupting somewhere in the middle of the chosen record all the way to the end + val position = seg.messageSet.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) + writeNonsense(seg.messageSet.file, position, seg.messageSet.file.length.toInt - position) + seg.recover(64*1024) + assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.messageSet.map(_.offset).toList) + seg.delete() + } + } + + def writeNonsense(fileName: File, position: Long, size: Int) { + val file = new RandomAccessFile(fileName, "rw") + file.seek(position) + val rand = new Random + for(i <- 0 until size) + file.writeByte(rand.nextInt(255)) + file.close() + } + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index afaa284..34d68eb 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -54,7 +54,10 @@ class LogTest extends JUnitSuite { } } - /** Test that the size and time based log segment rollout works. */ + /** + * Tests for time based log roll. This test appends messages then changes the time + * using the mock clock to force the log to roll and checks the number of segments. + */ @Test def testTimeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -70,27 +73,26 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) log.append(set) - assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) - - // segment expires in age - time.currentMs += rollMs + 1 - log.append(set) - assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments) - time.currentMs += rollMs + 1 - val blank = Array[Message]() - log.append(new ByteBufferMessageSet(new Message("blah".getBytes))) - assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments) + for(numSegments <- 2 until 4) { + time.currentMs += rollMs + 1 + log.append(set) + assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) + } + val numSegments = log.numberOfSegments time.currentMs += rollMs + 1 - // the last segment expired in age, but was blank. So new segment should not be generated log.append(new ByteBufferMessageSet()) - assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments) + assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments) } + /** + * Test that appending more than the maximum segment size rolls the log + */ @Test def testSizeBasedLogRoll() { - val set = TestUtils.singleMessageSet("test".getBytes()) + val set = TestUtils.singleMessageSet("test".getBytes) val setSize = set.sizeInBytes val msgPerSeg = 10 val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages @@ -106,28 +108,80 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) } + /** + * Test that we can open and append to an empty log + */ @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + log.append(TestUtils.singleMessageSet("test".getBytes)) } + /** + * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets. + */ @Test - def testAppendAndRead() { - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val message = new Message(Integer.toString(42).getBytes()) - for(i <- 0 until 10) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) - log.flush() - val messages = log.read(0, 1024) - var current = 0 - for(curr <- messages) { - assertEquals("Read message should equal written", message, curr.message) - current += 1 + def testAppendAndReadWithSequentialOffsets() { + val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray + + for(i <- 0 until messages.length) + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) + for(i <- 0 until messages.length) { + val read = log.read(i, 100, Some(i+1)).head + assertEquals("Offset read should match order appended.", i, read.offset) + assertEquals("Message should match appended.", messages(i), read.message) } - assertEquals(10, current) + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) } - + + /** + * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message + * from any offset less than the logEndOffset including offsets not appended. + */ + @Test + def testAppendAndReadWithNonSequentialOffsets() { + val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray + val messages = messageIds.map(id => new Message(id.toString.getBytes)) + + // now test the case that we give the offsets and use non-sequential offsets + for(i <- 0 until messages.length) + log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) + for(i <- 50 until messageIds.max) { + val idx = messageIds.indexWhere(_ >= i) + val read = log.read(i, 100, None).head + assertEquals("Offset read should match message id.", messageIds(idx), read.offset) + assertEquals("Message should match appended.", messages(idx), read.message) + } + } + + /** + * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. + * Specifically we create a log where the last message in the first segment has offset 0. If we + * then read offset 1, we should expect this read to come from the second segment, even though the + * first segment has the greatest lower bound on the offset. + */ + @Test + def testReadAtLogGap() { + val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + + // keep appending until we have two segments with only a single message in the second segment + while(log.numberOfSegments == 1) + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + + // now manually truncate off all but one message from the first segment to create a gap in the messages + log.logSegments.head.truncateTo(1) + + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) + } + + /** + * Test reading at the boundary of the log, specifically + * - reading from the logEndOffset should give an empty message set + * - reading beyond the log end offset should throw an OffsetOutOfRangeException + */ @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) @@ -147,14 +201,17 @@ class LogTest extends JUnitSuite { } } - /** Test that writing and reading beyond the log size boundary works */ + /** + * Test that covers reads and writes on a multisegment log. This test appends a bunch of messages + * and then reads them all back and checks that the message read and offset matches what was appended. + */ @Test def testLogRolls() { /* create a multipart log with 100 messages */ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) - val offsets = messageSets.map(log.append(_)._1) + val offsets = messageSets.map(log.append(_).firstOffset) log.flush /* do successive reads to ensure all our messages are there */ @@ -169,7 +226,9 @@ class LogTest extends JUnitSuite { assertEquals("Should be no more messages", 0, lastRead.size) } - /** Test the case where we have compressed batches of messages */ + /** + * Test reads at offsets that fall within compressed message set boundaries. + */ @Test def testCompressedMessages() { /* this log should roll after every messageset */ @@ -187,66 +246,46 @@ class LogTest extends JUnitSuite { assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - - @Test - def testFindSegment() { - assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45)) - assertEquals("Search in segment list just outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - None, Log.findRange(makeRanges(5, 9, 12), -1)) - assertContains(makeRanges(5, 9, 12), 11) - assertContains(makeRanges(5), 4) - assertContains(makeRanges(5,8), 5) - assertContains(makeRanges(5,8), 6) - } + /** + * Test garbage collecting old segments + */ @Test - def testEdgeLogRollsStartingAtZero() { - // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val curOffset = log.logEndOffset - assertEquals(curOffset, 0) - - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) - - // we now have a new log; the starting offset of the new log should remain 0 - assertEquals(curOffset, log.logEndOffset) - log.delete() - } - - @Test - def testEdgeLogRollsStartingAtNonZero() { - // second test an empty log segment starting at non-zero - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val numMessages = 1 - for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - val curOffset = log.logEndOffset - - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) - - // time goes by; the log file (which is empty) is deleted again - val deletedSegments = log.markDeletedWhile(_ => true) - - // we shouldn't delete the last empty log segment. - assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) + def testThatGarbageCollectingSegmentsDoesntChangeOffset() { + for(messagesToAppend <- List(0, 1, 25)) { + logDir.mkdirs() + // first test a log segment starting at 0 + val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + for(i <- 0 until messagesToAppend) + log.append(TestUtils.singleMessageSet(i.toString.getBytes)) + + var currOffset = log.logEndOffset + assertEquals(currOffset, messagesToAppend) + + // time goes by; the log file is deleted + log.deleteOldSegments(_ => true) + + assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset) + assertEquals("We should still have one segment left", 1, log.numberOfSegments) + assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) + assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + currOffset, + log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) + + // cleanup the log + log.delete() + } } + /** + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * setting and checking that an exception is thrown. + */ @Test def testMessageSizeCheck() { - val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes())) - val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes())) + val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes)) // append messages to log val maxMessageSize = second.sizeInBytes - 1 @@ -259,10 +298,13 @@ class LogTest extends JUnitSuite { log.append(second) fail("Second message set should throw MessageSizeTooLargeException.") } catch { - case e:MessageSizeTooLargeException => // this is good + case e: MessageSizeTooLargeException => // this is good } } + /** + * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. + */ @Test def testLogRecoversToCorrectOffset() { val numMessages = 100 @@ -273,25 +315,53 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) - val lastIndexOffset = log.segments.view.last.index.lastOffset - val numIndexEntries = log.segments.view.last.index.entries + val lastIndexOffset = log.activeSegment.index.lastOffset + val numIndexEntries = log.activeSegment.index.entries log.close() // test non-recovery case log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() - // test + // test recovery case log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) + log.close() + } + + /** + * Test that if we manually delete an index segment it is rebuilt when the log is re-opened + */ + @Test + def testIndexRebuild() { + // publish the messages and close the log + val numMessages = 200 + var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + for(i <- 0 until numMessages) + log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) + val indexFiles = log.logSegments.map(_.index.file) + log.close() + + // delete all the index files + indexFiles.foreach(_.delete()) + + // reopen the log + log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + + assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) + for(i <- 0 until numMessages) + assertEquals(i, log.read(i, 100, None).head.offset) log.close() } + /** + * Test the Log truncate operations + */ @Test def testTruncateTo() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -329,7 +399,7 @@ class LogTest extends JUnitSuite { assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) - log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)) + log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)) assertEquals("Should change log size", log.size, 0) @@ -343,6 +413,9 @@ class LogTest extends JUnitSuite { assertEquals("Should change log size", log.size, 0) } + /** + * Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends + */ @Test def testIndexResizingAtTruncation() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -357,41 +430,18 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) - assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries) + assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries) log.truncateTo(0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) - assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.segments.view(0).index.maxEntries) + assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries) for (i<- 1 to msgPerSeg) log.append(set) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } - - @Test - def testAppendWithoutOffsetAssignment() { - for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { - logDir.mkdir() - var log = new Log(logDir, - maxLogFileSize = 64*1024, - maxMessageSize = config.maxMessageSize, - maxIndexSize = 1000, - indexIntervalBytes = 10000, - needsRecovery = true) - val messages = List("one", "two", "three", "four", "five", "six") - val ms = new ByteBufferMessageSet(compressionCodec = codec, - offsetCounter = new AtomicLong(5), - messages = messages.map(s => new Message(s.getBytes)):_*) - val firstOffset = ms.shallowIterator.toList.head.offset - val lastOffset = ms.shallowIterator.toList.last.offset - val (first, last) = log.append(ms, assignOffsets = false) - assertEquals(last + 1, log.logEndOffset) - assertEquals(firstOffset, first) - assertEquals(lastOffset, last) - assertTrue(log.read(5, 64*1024).size > 0) - log.delete() - } - } - + /** + * Verify that truncation works correctly after re-opening the log + */ @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -419,24 +469,4 @@ class LogTest extends JUnitSuite { assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - def assertContains(ranges: Array[Range], offset: Long) = { - Log.findRange(ranges, offset) match { - case Some(range) => - assertTrue(range + " does not contain " + offset, range.contains(offset)) - case None => fail("No range found, but expected to find " + offset) - } - } - - class SimpleRange(val start: Long, val size: Long) extends Range - - def makeRanges(breaks: Int*): Array[Range] = { - val list = new ArrayList[Range] - var prior = 0 - for(brk <- breaks) { - list.add(new SimpleRange(prior, brk - prior)) - prior = brk - } - list.toArray(new Array[Range](list.size)) - } - } diff --git a/core/src/test/scala/unit/kafka/log/SegmentListTest.scala b/core/src/test/scala/unit/kafka/log/SegmentListTest.scala deleted file mode 100644 index bf91ff1..0000000 --- a/core/src/test/scala/unit/kafka/log/SegmentListTest.scala +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import junit.framework.Assert._ -import org.junit.Test -import org.scalatest.junit.JUnitSuite -import kafka.common.KafkaException - -class SegmentListTest extends JUnitSuite { - - @Test - def testAppend() { - val list = List(1, 2, 3, 4) - val sl = new SegmentList(list) - val view = sl.view - assertEquals(list, view.iterator.toList) - sl.append(5) - assertEquals("Appending to both should result in lists that are still equals", - list ::: List(5), sl.view.iterator.toList) - assertEquals("But the prior view should still equal the original list", list, view.iterator.toList) - } - - @Test - def testTrunc() { - { - val hd = List(1,2,3) - val tail = List(4,5,6) - val sl = new SegmentList(hd ::: tail) - val view = sl.view - assertEquals(hd ::: tail, view.iterator.toList) - val deleted = sl.trunc(3) - assertEquals(tail, sl.view.iterator.toList) - assertEquals(hd, deleted.iterator.toList) - assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) - } - { - val hd = List(1,2,3,4,5) - val tail = List(6) - val sl = new SegmentList(hd ::: tail) - val view = sl.view - assertEquals(hd ::: tail, view.iterator.toList) - try { - sl.trunc(-1) - fail("Attempt to truncate with illegal index should fail") - } catch { - case e: KafkaException => // this is ok - } - val deleted = sl.truncLast(4) - assertEquals(hd, sl.view.iterator.toList) - assertEquals(tail, deleted.iterator.toList) - assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) - } - { - val sl = new SegmentList(List(1, 2)) - sl.trunc(3) - assertEquals(0, sl.view.length) - } - } - - @Test - def testTruncLast() { - { - val hd = List(1,2,3) - val tail = List(4,5,6) - val sl = new SegmentList(hd ::: tail) - val view = sl.view - assertEquals(hd ::: tail, view.iterator.toList) - val deleted = sl.truncLast(2) - assertEquals(hd, sl.view.iterator.toList) - assertEquals(tail, deleted.iterator.toList) - assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) - } - { - val hd = List(1,2,3,4,5) - val tail = List(6) - val sl = new SegmentList(hd ::: tail) - val view = sl.view - assertEquals(hd ::: tail, view.iterator.toList) - try { - sl.truncLast(6) - fail("Attempt to truncate with illegal index should fail") - } catch { - case e: KafkaException => // this is ok - } - try { - sl.truncLast(-1) - fail("Attempt to truncate with illegal index should fail") - } catch { - case e: KafkaException => // this is ok - } - val deleted = sl.truncLast(4) - assertEquals(hd, sl.view.iterator.toList) - assertEquals(tail, deleted.iterator.toList) - assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) - } - } -} diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index ef74ba8..6db245c 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -26,7 +26,7 @@ import org.junit.Test trait BaseMessageSetTestCases extends JUnitSuite { - val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes())) + val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes)) def createMessageSet(messages: Seq[Message]): MessageSet diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 19f4c3b..7d77eb6 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("producer.num.retries", 3.toString) val config = new ProducerConfig(props) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala new file mode 100644 index 0000000..aa58dce --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.File +import kafka.utils._ +import junit.framework.Assert._ +import java.util.{Random, Properties} +import kafka.consumer.SimpleConsumer +import org.junit.{After, Before, Test} +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite +import kafka.admin.CreateTopicCommand +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} +import kafka.utils.TestUtils._ +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils.nonthreadsafe +import kafka.utils.threadsafe +import org.junit.After +import org.junit.Before +import org.junit.Test + +class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { + val random = new Random() + var logDir: File = null + var topicLogDir: File = null + var server: KafkaServer = null + var logSize: Int = 100 + val brokerPort: Int = 9099 + var simpleConsumer: SimpleConsumer = null + var time: Time = new MockTime() + + @Before + override def setUp() { + super.setUp() + val config: Properties = createBrokerConfig(1, brokerPort) + val logDirPath = config.getProperty("log.dir") + logDir = new File(logDirPath) + time = new MockTime() + server = TestUtils.createServer(new KafkaConfig(config), time) + simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) + } + + @After + override def tearDown() { + simpleConsumer.close + server.shutdown + Utils.rm(logDir) + super.tearDown() + } + + @Test + def testGetOffsetsForUnknownTopic() { + val topicAndPartition = TopicAndPartition("foo", 0) + val request = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10))) + val offsetResponse = simpleConsumer.getOffsetsBefore(request) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + offsetResponse.partitionErrorAndOffsets(topicAndPartition).error) + } + + @Test + def testGetOffsetsBeforeLatestTime() { + val topicPartition = "kafka-" + 0 + val topic = topicPartition.split("-").head + val part = Integer.valueOf(topicPartition.split("-").last).intValue + + // setup brokers in zookeeper as owners of partitions for this test + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + + val logManager = server.getLogManager + val log = logManager.getOrCreateLog(topic, part) + + val message = new Message(Integer.toString(42).getBytes()) + for(i <- 0 until 20) + log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.flush() + + val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) + assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) + + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), + replicaId = 0) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) + + // try to fetch using latest offset + val fetchResponse = simpleConsumer.fetch( + new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build()) + assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext) + } + + @Test + def testEmptyLogsGetOffsets() { + val topicPartition = "kafka-" + random.nextInt(10) + val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition + topicLogDir = new File(topicPartitionPath) + topicLogDir.mkdir + + val topic = topicPartition.split("-").head + + // setup brokers in zookeeper as owners of partitions for this test + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) + + var offsetChanged = false + for(i <- 1 to 14) { + val topicAndPartition = TopicAndPartition(topic, 0) + val offsetRequest = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + + if(consumerOffsets(0) == 1) { + offsetChanged = true + } + } + assertFalse(offsetChanged) + } + + @Test + def testGetOffsetsBeforeNow() { + val topicPartition = "kafka-" + random.nextInt(3) + val topic = topicPartition.split("-").head + val part = Integer.valueOf(topicPartition.split("-").last).intValue + + // setup brokers in zookeeper as owners of partitions for this test + CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + + val logManager = server.getLogManager + val log = logManager.getOrCreateLog(topic, part) + val message = new Message(Integer.toString(42).getBytes()) + for(i <- 0 until 20) + log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.flush() + + val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs + + val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) + assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) + + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) + } + + @Test + def testGetOffsetsBeforeEarliestTime() { + val topicPartition = "kafka-" + random.nextInt(3) + val topic = topicPartition.split("-").head + val part = Integer.valueOf(topicPartition.split("-").last).intValue + + // setup brokers in zookeeper as owners of partitions for this test + CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + + val logManager = server.getLogManager + val log = logManager.getOrCreateLog(topic, part) + val message = new Message(Integer.toString(42).getBytes()) + for(i <- 0 until 20) + log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) + log.flush() + + val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10) + + assertEquals(Seq(0L), offsets) + + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(0L), consumerOffsets) + } + + private def createBrokerConfig(nodeId: Int, port: Int): Properties = { + val props = new Properties + props.put("brokerid", nodeId.toString) + props.put("port", port.toString) + props.put("log.dir", getLogDir.getAbsolutePath) + props.put("log.flush.interval", "1") + props.put("enable.zookeeper", "false") + props.put("num.partitions", "20") + props.put("log.retention.hours", "10") + props.put("log.cleanup.interval.mins", "5") + props.put("log.file.size", logSize.toString) + props.put("zk.connect", zkConnect.toString) + props + } + + private def getLogDir(): File = { + val dir = TestUtils.tempDir() + dir + } + +} diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 3aae5ce..95ecacc 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -40,7 +40,7 @@ class SimpleFetchTest extends JUnit3Suite { val partitionId = 0 /** - * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has * one partition with one follower replica on broker "1". The leader replica on "0" * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync @@ -62,6 +62,7 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log) EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) EasyMock.replay(log) @@ -102,33 +103,6 @@ class SimpleFetchTest extends JUnit3Suite { // make sure the log only reads bytes between 0->HW (5) EasyMock.verify(log) - - // Test offset request from non-replica - val topicAndPartition = TopicAndPartition(topic, partition.partitionId) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) - - EasyMock.reset(logManager) - EasyMock.reset(replicaManager) - - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) - - EasyMock.replay(replicaManager) - EasyMock.replay(logManager) - - apis.handleOffsetRequest(new RequestChannel.Request(processor = 0, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) - val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer - val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) - EasyMock.verify(replicaManager) - EasyMock.verify(logManager) - assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) - assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } /** @@ -203,34 +177,6 @@ class SimpleFetchTest extends JUnit3Suite { * an offset of 15 */ EasyMock.verify(log) - - // Test offset request from replica - val topicAndPartition = TopicAndPartition(topic, partition.partitionId) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)), - replicaId = followerReplicaId) - val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) - - EasyMock.reset(logManager) - EasyMock.reset(replicaManager) - - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) - - EasyMock.replay(replicaManager) - EasyMock.replay(logManager) - - apis.handleOffsetRequest(new RequestChannel.Request(processor = 1, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) - val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer - val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) - EasyMock.verify(replicaManager) - EasyMock.verify(logManager) - assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) - assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 6763357..d883bde 100644 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -33,7 +33,8 @@ class EmbeddedZookeeper(val connectString: String) { factory.startup(zookeeper) def shutdown() { - factory.shutdown() + Utils.swallow(zookeeper.shutdown()) + Utils.swallow(factory.shutdown()) Utils.rm(logDir) Utils.rm(snapshotDir) } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 63e528f..4e25b92 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, TestZKUtils} +import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils} trait ZooKeeperTestHarness extends JUnit3Suite { val zkConnect: String = TestZKUtils.zookeeperConnect @@ -36,8 +36,8 @@ trait ZooKeeperTestHarness extends JUnit3Suite { override def tearDown() { super.tearDown - zkClient.close() - zookeeper.shutdown() + Utils.swallow(zkClient.close()) + Utils.swallow(zookeeper.shutdown()) } } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 2b87560..a1d1ca2 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -25,7 +25,6 @@ import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; public class Consumer extends Thread