diff --git a/config/log4j.properties b/config/log4j.properties index 3b13181..140dccd 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -30,4 +30,5 @@ log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG log4j.logger.kafka.perf=DEBUG -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +#log4j.logger.kafka.log=DEBUG diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index c47e2af..0546e1e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -60,7 +60,7 @@ case class ProducerRequest(versionId: Short, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { + data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) diff --git a/core/src/main/scala/kafka/common/InvalidTopicException.scala b/core/src/main/scala/kafka/common/InvalidTopicException.scala index 4907080..59f8874 100644 --- a/core/src/main/scala/kafka/common/InvalidTopicException.scala +++ b/core/src/main/scala/kafka/common/InvalidTopicException.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.common class InvalidTopicException(message: String) extends RuntimeException(message) { diff --git a/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala b/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala index edb90c1..2d18324 100644 --- a/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala +++ b/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.common class MessageSizeTooLargeException(message: String) extends RuntimeException(message) { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f13f449..f48cb1b 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -26,16 +26,20 @@ class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: Broker, val consumerFetcherManager: ConsumerFetcherManager) - extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, - socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize, - fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs, - minBytes = config.minFetchBytes) { + extends AbstractFetcherThread(name = name, + sourceBroker = sourceBroker, + socketTimeout = config.socketTimeoutMs, + socketBufferSize = config.socketBufferSize, + fetchSize = config.fetchSize, + fetcherBrokerId = FetchRequest.NonFollowerId, + maxWait = config.maxFetchWaitMs, + minBytes = config.minFetchBytes) { // process fetched data def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) { val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition)) if (pti.getFetchOffset != fetchOffset) - throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d" + throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset: %d fetch offset: %d" .format(topic, partitionData.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 14f18aa..010f1c5 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -85,6 +85,8 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], } val item = localCurrent.next() consumedOffset = item.nextOffset + + item.message.ensureValid() // validate checksum of message to ensure it is valid new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic) } diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 3281ca9..cdbd2ea 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -53,10 +53,10 @@ private[consumer] class PartitionTopicInfo(val topic: String, def enqueue(messages: ByteBufferMessageSet) { val size = messages.validBytes if(size > 0) { - // update fetched offset to the compressed data chunk size, not the decompressed message set size + val next = messages.last.nextOffset // this is a bit inefficient trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) - val newOffset = fetchedOffset.addAndGet(size) + val newOffset = fetchedOffset.set(next) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 1da614a..63e2cd6 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -28,10 +28,10 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} * A consumer of kafka messages */ @threadsafe -class SimpleConsumer( val host: String, - val port: Int, - val soTimeout: Int, - val bufferSize: Int ) extends Logging { +class SimpleConsumer(val host: String, + val port: Int, + val soTimeout: Int, + val bufferSize: Int) extends Logging { private val lock = new Object() private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 27fc2c8..4d0d751 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -16,6 +16,7 @@ */ package kafka.javaapi.message +import java.util.concurrent.atomic.AtomicLong import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ @@ -24,7 +25,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, 0L, scala.collection.JavaConversions.asBuffer(messages): _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala new file mode 100644 index 0000000..6caf56c --- /dev/null +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io._ +import java.nio._ +import java.nio.channels._ +import java.util.concurrent.atomic._ + +import kafka.utils._ +import kafka.message._ +import kafka.common.KafkaException +import java.util.concurrent.TimeUnit +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} + +/** + * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts + * will fail on an immutable message set. An optional limit and start position can be applied to the message set + * which will control the position in the file at which the set begins + */ +@nonthreadsafe +class FileMessageSet private[kafka](val file: File, + private[log] val channel: FileChannel, + private[log] val start: Long, // the starting position in the file + private[log] val limit: Long, // the length (may be less than the file length) + val mutable: Boolean) extends MessageSet with Logging { + + private val setSize = new AtomicLong() + + if(mutable) { + if(limit < Long.MaxValue || start > 0) + throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.") + + setSize.set(channel.size()) + channel.position(channel.size) + } else { + setSize.set(scala.math.min(channel.size(), limit) - start) + } + + /** + * Create a file message set with no limit or offset + */ + def this(file: File, channel: FileChannel, mutable: Boolean) = + this(file, channel, 0, Long.MaxValue, mutable) + + /** + * Create a file message set with no limit or offset + */ + def this(file: File, mutable: Boolean) = + this(file, Utils.openChannel(file, mutable), mutable) + + /** + * Return a message set which is a view into this set starting from the given position and with the given size limit. + */ + def read(position: Long, size: Long): FileMessageSet = { + new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()), + false) + } + + /** + * Search forward for the file position of the last offset that is great than or equal to the target offset + * and return its physical position. If no such offsets are found, return null. + */ + private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { + var position = startingPosition + val buffer = ByteBuffer.allocate(12) + val size = setSize.get() + while(position + 8 < size) { + buffer.rewind() + channel.read(buffer, position) + if(buffer.hasRemaining) + throw new IllegalStateException("Failed to read complete buffer.") + buffer.rewind() + val offset = buffer.getLong() + if(offset >= targetOffset) + return OffsetPosition(offset, position) + val messageSize = buffer.getInt() + position += MessageSet.LogOverhead + messageSize + } + null + } + + /** + * Write some of this set to the given channel, return the amount written + */ + def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Long): Long = + channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel) + + /** + * Get an iterator over the messages in the set + */ + override def iterator: Iterator[MessageAndOffset] = { + new IteratorTemplate[MessageAndOffset] { + var location = start + + override def makeNext(): MessageAndOffset = { + // read the size of the item + val sizeOffsetBuffer = ByteBuffer.allocate(12) + channel.read(sizeOffsetBuffer, location) + if(sizeOffsetBuffer.hasRemaining) + return allDone() + + sizeOffsetBuffer.rewind() + val offset = sizeOffsetBuffer.getLong() + val size = sizeOffsetBuffer.getInt() + if (size < Message.MinHeaderSize) + return allDone() + + // read the item itself + val buffer = ByteBuffer.allocate(size) + channel.read(buffer, location + 12) + if(buffer.hasRemaining) + return allDone() + buffer.rewind() + + // increment the location and return the item + location += size + 12 + new MessageAndOffset(new Message(buffer), offset) + } + } + } + + /** + * The number of bytes taken up by this file set + */ + def sizeInBytes(): Long = setSize.get() + + def checkMutable(): Unit = { + if(!mutable) + throw new KafkaException("Attempt to invoke mutation on immutable message set.") + } + + /** + * Append this message to the message set + */ + def append(messages: MessageSet): Unit = { + checkMutable() + var written = 0L + while(written < messages.sizeInBytes) + written += messages.writeTo(channel, 0, messages.sizeInBytes) + setSize.getAndAdd(written) + } + + /** + * Commit all written data to the physical disk + */ + def flush() = { + checkMutable() + LogFlushStats.logFlushTimer.time { + channel.force(true) + } + } + + /** + * Close this message set + */ + def close() { + if(mutable) + flush() + channel.close() + } + + /** + * Delete this message set from the filesystem + */ + def delete(): Boolean = { + Utils.swallow(channel.close()) + file.delete() + } + + /** + * Truncate this file message set to the given size. Note that this API does no checking that the + * given size falls on a valid byte offset. + */ + def truncateTo(targetSize: Long) = { + checkMutable() + if(targetSize > sizeInBytes()) + throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + + " size of this log segment is only %d bytes".format(sizeInBytes())) + channel.truncate(targetSize) + channel.position(targetSize) + setSize.set(targetSize) + } + +} + +object LogFlushStats extends KafkaMetricsGroup { + val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5c2f775..8a48e48 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -22,6 +22,7 @@ import java.io.{IOException, File} import java.util.{Comparator, Collections, ArrayList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger} import kafka.utils._ +import scala.math._ import java.text.NumberFormat import kafka.server.BrokerTopicStat import kafka.message._ @@ -30,40 +31,35 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge object Log { - val FileSuffix = ".kafka" + val LogFileSuffix = ".log" + val IndexFileSuffix = ".index" /** * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges * but instead of checking for equality looks within the range. Takes the array size as an option in case * the array grows while searching happens - * - * TODO: This should move into SegmentList.scala */ def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = { if(ranges.size < 1) return None // check out of bounds - if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size) - throw new OffsetOutOfRangeException("offset " + value + " is out of range") - - // check at the end - if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size) + if(value < ranges(0).start) return None var low = 0 var high = arraySize - 1 - while(low <= high) { - val mid = (high + low) / 2 + while(low < high) { + val mid = ceil((high + low) / 2.0).toInt val found = ranges(mid) - if(found.contains(value)) + if(found.start == value) return Some(found) else if (value < found.start) high = mid - 1 else - low = mid + 1 + low = mid } - None + Some(ranges(low)) } def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] = @@ -73,13 +69,17 @@ object Log { * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically */ - def nameFromOffset(offset: Long): String = { + def filenamePrefixFromOffset(offset: Long): String = { val nf = NumberFormat.getInstance() nf.setMinimumIntegerDigits(20) nf.setMaximumFractionDigits(0) nf.setGroupingUsed(false) - nf.format(offset) + FileSuffix + nf.format(offset) } + + def logFilename(offset: Long) = filenamePrefixFromOffset(offset) + LogFileSuffix + + def indexFilename(offset: Long) = filenamePrefixFromOffset(offset) + IndexFileSuffix def getEmptyOffsets(request: OffsetRequest): Array[Long] = { if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime) @@ -91,48 +91,24 @@ object Log { /** - * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size - */ -class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range { - var firstAppendTime: Option[Long] = None - @volatile var deleted = false - /* Return the size in bytes of this log segment */ - def size: Long = messageSet.sizeInBytes() - /* Return the absolute end offset of this log segment */ - def absoluteEndOffset: Long = start + messageSet.sizeInBytes() - - def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - - def append(messages: ByteBufferMessageSet) { - if (messages.sizeInBytes > 0) { - messageSet.append(messages) - updateFirstAppendTime() - } - } - - override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")" - - /** - * Truncate this log segment upto absolute offset value. Since the offset specified is absolute, to compute the amount - * of data to be deleted, we have to compute the offset relative to start of the log segment - * @param offset Absolute offset for this partition - */ - def truncateTo(offset: Long) = { - messageSet.truncateTo(offset - start) - } -} - - -/** - * An append-only log for storing messages. + * An append-only log for storing messages. + * + * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment. + * + * New log segments are created according to a configurable policy that controls the size in bytes or time interval + * for a given segment. + * */ @threadsafe -private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int, - val rollIntervalMs: Long, val needRecovery: Boolean, time: Time, - brokerId: Int = 0) extends Logging with KafkaMetricsGroup { +private[kafka] class Log(val dir: File, + val maxLogFileSize: Long, + val maxMessageSize: Int, + val flushInterval: Int, + val rollIntervalMs: Long, + val needsRecovery: Boolean, + val indexIntervalBytes: Int = 4096, + time: Time = SystemTime, + brokerId: Int = 0) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Log on Broker " + brokerId + "], " import kafka.log.Log._ @@ -145,23 +121,24 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag /* last time it was flushed */ private val lastflushedTime = new AtomicLong(System.currentTimeMillis) + + /* the number of index entries to allow in the index */ + private val maxIndexEntries = (10*1024*1024)/8 - /* The actual segments of the log */ + /* the actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments() + + /* the number of bytes since we last added an entry in the offset index */ + private var bytesSinceLastIndexEntry = 0 + + /* Calculate the offset of the next message */ + private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) - newGauge( - name + "-" + "NumLogSegments", - new Gauge[Int] { - def value() = numberOfSegments - } - ) + newGauge(name + "-" + "NumLogSegments", + new Gauge[Int] { def value() = numberOfSegments }) - newGauge( - name + "-" + "LogEndOffset", - new Gauge[Long] { - def value() = logEndOffset - } - ) + newGauge(name + "-" + "LogEndOffset", + new Gauge[Long] { def value() = logEndOffset }) /* The name of this log */ def name = dir.getName() @@ -172,21 +149,30 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag val logSegments = new ArrayList[LogSegment] val ls = dir.listFiles() if(ls != null) { - for(file <- ls if file.isFile && file.toString.endsWith(FileSuffix)) { + for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { if(!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName() - val start = filename.substring(0, filename.length - FileSuffix.length).toLong - val messageSet = new FileMessageSet(file, false) - logSegments.add(new LogSegment(file, messageSet, start, time)) + val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong + val messageSet = new FileMessageSet(file, mutable = false) + val indexFile = new File(dir, Log.indexFilename(start)) + // TODO: we should ideally rebuild any missing index files, instead of erroring out + if(!indexFile.exists()) + throw new IllegalStateException("Found log file with no corresponding index file.") + val index = new OffsetIndex(file = indexFile, baseOffset = start, mutable = false, maxIndexEntries) + logSegments.add(new LogSegment(messageSet, index, start, indexIntervalBytes, time)) } } if(logSegments.size == 0) { // no existing segments, create a new mutable segment - val newFile = new File(dir, nameFromOffset(0)) - val set = new FileMessageSet(newFile, true) - logSegments.add(new LogSegment(newFile, set, 0, time)) + val log = new FileMessageSet(file = new File(dir, logFilename(0)), + mutable = true) + val index = new OffsetIndex(file = new File(dir, indexFilename(0)), + mutable = true, + baseOffset = 0, + maxEntries = maxIndexEntries) + logSegments.add(new LogSegment(log, index, 0, indexIntervalBytes, time)) } else { // there is at least one existing segment, validate and recover them/it // sort segments into ascending order for fast searching @@ -197,30 +183,45 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag else 1 } }) - validateSegments(logSegments) //make the final section mutable and run recovery on it if necessary val last = logSegments.remove(logSegments.size - 1) last.messageSet.close() - info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery) - val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start, time) - logSegments.add(mutable) + val logFile = new File(dir, logFilename(last.start)) + val log = new FileMessageSet(logFile, mutable = true) + val index = new OffsetIndex(last.index.file, last.index.baseOffset, mutable = true, maxIndexEntries) + val mutableSegment = new LogSegment(log, index, last.start, indexIntervalBytes, time) + if(needsRecovery) + recoverSegment(mutableSegment) + logSegments.add(mutableSegment) } new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) } - + /** - * Check that the ranges and sizes add up, otherwise we have lost some data somewhere + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. */ - private def validateSegments(segments: ArrayList[LogSegment]) { - lock synchronized { - for(i <- 0 until segments.size - 1) { - val curr = segments.get(i) - val next = segments.get(i+1) - if(curr.start + curr.size != next.start) - throw new KafkaException("The following segments don't validate: " + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath()) + private def recoverSegment(segment: LogSegment) { + segment.index.truncateTo(0) + var validBytes = 0 + var lastIndexEntry = 0 + val iter = segment.messageSet.iterator + try { + while(iter.hasNext) { + val entry = iter.next + entry.message.ensureValid() + if(validBytes - lastIndexEntry > indexIntervalBytes) + segment.index.append(entry.offset, validBytes) + validBytes += MessageSet.entrySize(entry.message) } + } catch { + case e: InvalidMessageException => + logger.warn("Found invalid messages in log " + name) } + val truncated = segment.messageSet.sizeInBytes - validBytes + if(truncated > 0) + warn("Truncated " + truncated + " invalid bytes from the log " + name + ".") + segment.messageSet.truncateTo(validBytes) } /** @@ -235,22 +236,20 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag debug("Closing log " + name) lock synchronized { for(seg <- segments.view) { - info("Closing log segment " + seg.file.getAbsolutePath) - seg.messageSet.close() - } + debug("Closing log segment " + seg.start) + seg.messageSet.close() } } } /** * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. - * Returns the offset at which the messages are written. + * Returns the offset at which the messages are written, or -1 if the message set is empty */ - def append(messages: ByteBufferMessageSet): Unit = { - + def append(messages: ByteBufferMessageSet): Long = { // check that all messages are valid and see if we have any compressed messages var messageCount = 0 var codec: CompressionCodec = NoCompressionCodec - for(messageAndOffset <- messages) { + for(messageAndOffset <- messages.shallowIterator) { val m = messageAndOffset.message m.ensureValid() if(MessageSet.entrySize(m) > maxMessageSize) @@ -262,7 +261,9 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag } // if we have any valid messages, append them to the log - if(messageCount > 0) { + if(messageCount == 0) { + -1L + } else { BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageCount) BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageCount) @@ -271,18 +272,31 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag // they are valid, insert them in the log lock synchronized { - validMessages = validMessages.assignOffsets(logEndOffset, codec) try { - var segment = segments.view.last - maybeRoll(segment) - segment = segments.view.last - segment.append(validMessages) + val offset = nextOffset.get + + // maybe roll the log + val segment = maybeRoll(segments.view.last) + + // assign offsets to the messages + validMessages = validMessages.assignOffsets(nextOffset, codec) + + trace("Appending message set to " + this.name + ": " + validMessages) + + // now append to the log + segment.append(offset, validMessages) + this.bytesSinceLastIndexEntry += validMessages.sizeInBytes.toInt + + // maybe flush the log and index maybeFlush(messageCount) + + // return the offset at which the messages were appended + offset } catch { case e: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request", e) Runtime.getRuntime.halt(1) - case e2 => throw e2 + -1L } } } @@ -295,7 +309,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag val messageSetValidBytes = messages.validBytes if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0) throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") - if(messageSetValidBytes == messages.size) { + if(messageSetValidBytes == messages.sizeInBytes) { messages } else { // trim invalid bytes @@ -306,18 +320,28 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag } /** - * Read from the log file at the given offset + * Read a message set from the log. + * startOffset - The logical offset to begin reading at + * maxLength - The maximum number of bytes to read + * maxOffset - The maximum logical offset to include in the resulting message set */ - def read(offset: Long, length: Int): MessageSet = { - trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(length, offset, name, size)) + def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { + trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(maxLength, startOffset, name, size)) val view = segments.view - Log.findRange(view, offset, view.length) match { - case Some(segment) => - if(length <= 0) - MessageSet.Empty - else - segment.messageSet.read((offset - segment.start), length) - case _ => MessageSet.Empty + + // check if the offset is valid and in range + val first = view.head.start + val next = nextOffset.get + if(startOffset == next) + return MessageSet.Empty + else if(startOffset > next || startOffset < first) + throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next)) + + // Do the read on the segment with a base offset less than the target offset + // TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything + Log.findRange(view, startOffset, view.length) match { + case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset") + case Some(segment) => segment.read(startOffset, maxLength, maxOffset) } } @@ -338,7 +362,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag else { // If the last segment to be deleted is empty and we roll the log, the new segment will have the same // file name. So simply reuse the last segment and reset the modified time. - view(numToDelete - 1).file.setLastModified(time.milliseconds) + view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds) numToDelete -=1 } } @@ -352,33 +376,42 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag def size: Long = segments.view.foldLeft(0L)(_ + _.size) /** - * Get the absolute offset of the last message in the log + * Get the offset of the next message that will be appended */ - def logEndOffset: Long = segments.view.last.start + segments.view.last.size + def logEndOffset: Long = nextOffset.get /** * Roll the log over if necessary */ - private def maybeRoll(segment: LogSegment) { + private def maybeRoll(segment: LogSegment): LogSegment = { if ((segment.messageSet.sizeInBytes > maxLogFileSize) || - ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs))) + ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) || + segment.index.isFull) roll() + else + segment } /** - * Create a new segment and make it active + * Create a new segment and make it active, and return it */ - def roll() { + def roll(): LogSegment = { lock synchronized { flush val newOffset = logEndOffset - val newFile = new File(dir, nameFromOffset(newOffset)) - if (newFile.exists) { - warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first") - newFile.delete() + val logFile = new File(dir, logFilename(newOffset)) + val indexFile = new File(dir, indexFilename(newOffset)) + for(file <- List(logFile, indexFile); if file.exists) { + warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") + file.delete() } - debug("Rolling log '" + name + "' to " + newFile.getName()) - segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time)) + debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) + segments.view.last.index.makeReadOnly() + val log = new FileMessageSet(file = logFile, mutable = true) + val index = new OffsetIndex(file = indexFile, baseOffset = newOffset, mutable = true, maxEntries = maxIndexEntries) + val segment = new LogSegment(log, index, newOffset, indexIntervalBytes, time) + segments.append(segment) + segment } } @@ -395,12 +428,15 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag * Flush this log file to the physical disk */ def flush() : Unit = { - if (unflushed.get == 0) return + if (unflushed.get == 0) + return lock synchronized { debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + time.milliseconds) - segments.view.last.messageSet.flush() + val last = segments.view.last + last.messageSet.flush() + last.index.flush() unflushed.set(0) lastflushedTime.set(time.milliseconds) } @@ -409,15 +445,15 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag def getOffsetsBefore(request: OffsetRequest): Array[Long] = { val segsArray = segments.view var offsetTimeArray: Array[(Long, Long)] = null - if (segsArray.last.size > 0) + if(segsArray.last.size > 0) offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) else offsetTimeArray = new Array[(Long, Long)](segsArray.length) - for (i <- 0 until segsArray.length) - offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified) - if (segsArray.last.size > 0) - offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds) + for(i <- 0 until segsArray.length) + offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) + if(segsArray.last.size > 0) + offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) var startIndex = -1 request.time match { @@ -439,7 +475,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag val retSize = request.maxNumOffsets.min(startIndex + 1) val ret = new Array[Long](retSize) - for (j <- 0 until retSize) { + for(j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } @@ -452,15 +488,18 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag def truncateAndStartWithNewOffset(newOffset: Long) { lock synchronized { val deletedSegments = segments.trunc(segments.view.size) - val newFile = new File(dir, Log.nameFromOffset(newOffset)) - debug("Truncate and start log '" + name + "' to " + newFile.getName()) - segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time)) + val logFile = new File(dir, logFilename(newOffset)) + val indexFile = new File(dir, indexFilename(newOffset)) + debug("Truncate and start log '" + name + "' to " + newOffset) + val log = new FileMessageSet(file=logFile, mutable=true) + val index = new OffsetIndex(file=indexFile, baseOffset=newOffset, mutable = true, maxEntries=maxIndexEntries) + segments.append(new LogSegment(log, index, newOffset, indexIntervalBytes, time)) deleteSegments(deletedSegments) } } - def deleteWholeLog():Unit = { + def delete(): Unit = { deleteSegments(segments.contents.get()) Utils.rm(dir) } @@ -469,10 +508,9 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag def deleteSegments(segments: Seq[LogSegment]): Int = { var total = 0 for(segment <- segments) { - info("Deleting log segment " + segment.file.getName() + " from " + name) - swallow(segment.messageSet.close()) - if(!segment.file.delete()) { - warn("Delete failed.") + info("Deleting log segment " + segment.start + " from " + name) + if(!segment.messageSet.delete() || !segment.index.delete()) { + warn("Delete of log segment " + segment.start + " failed.") } else { total += 1 } @@ -482,18 +520,19 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag def truncateTo(targetOffset: Long) { // find the log segment that has this hw - val segmentToBeTruncated = segments.view.find( - segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset) + val view = segments.view + val startOffset = view.map(_.start).filter(_ <= targetOffset).max + val segmentToBeTruncated = view.find(_.start == startOffset) segmentToBeTruncated match { case Some(segment) => val truncatedSegmentIndex = segments.view.indexOf(segment) segments.truncLast(truncatedSegmentIndex) segment.truncateTo(targetOffset) - info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset)) + info("Truncated log segment %s to highwatermark %d".format(segment.messageSet.file.getAbsolutePath, targetOffset)) case None => - if(targetOffset > segments.view.last.absoluteEndOffset) - error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath)) + if(targetOffset > logEndOffset) + error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath)) } val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 45680e4..91c44f0 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -46,7 +46,7 @@ private[kafka] class LogManager(val config: KafkaConfig, private val logRetentionSizeMap = config.logRetentionSizeMap private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) - this.logIdent = "[Log Manager on Broker " + config.brokerId + "], " + this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() @@ -66,7 +66,15 @@ private[kafka] class LogManager(val config: KafkaConfig, val topic = Utils.getTopicPartition(dir.getName)._1 val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - val log = new Log(dir, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId) + val log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + flushInterval, + rollIntervalMs, + needRecovery, + config.logIndexIntervalBytes, + time, + config.brokerId) val topicPartition = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]()) val parts = logs.get(topicPartition._1) @@ -85,7 +93,7 @@ private[kafka] class LogManager(val config: KafkaConfig, scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false) info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals) - scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-", + scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-", config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false) } } @@ -100,7 +108,7 @@ private[kafka] class LogManager(val config: KafkaConfig, d.mkdirs() val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false, time, config.brokerId) + new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexIntervalBytes, time, config.brokerId) } } @@ -159,7 +167,7 @@ private[kafka] class LogManager(val config: KafkaConfig, val startMs = time.milliseconds val topic = Utils.getTopicPartition(log.name)._1 val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) - val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs) + val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs) val total = log.deleteSegments(toBeDeleted) total } @@ -205,9 +213,9 @@ private[kafka] class LogManager(val config: KafkaConfig, * Close all the logs */ def shutdown() { - info("shut down") + debug("Log manager is shutting down.") allLogs.foreach(_.close()) - info("shutted down completedly") + debug("Log manager shutdown complete.") } /** @@ -215,21 +223,22 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def allLogs() = logs.values.flatMap(_.values) - private def flushAllLogs() = { - debug("Flushing the high watermark of all logs") - for (log <- allLogs) - { - try{ + /** + * Flush any log which has exceeded its flush interval and has unwritten messages. + */ + private def flushDirtyLogs() = { + debug("Checking for dirty logs to flush...") + for (log <- allLogs) { + try { val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime var logFlushInterval = config.defaultFlushIntervalMs if(logFlushIntervals.contains(log.topicName)) logFlushInterval = logFlushIntervals(log.topicName) debug(log.topicName + " flush interval " + logFlushInterval + - " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush) + " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush) if(timeSinceLastFlush >= logFlushInterval) log.flush - } - catch { + } catch { case e => error("Error flushing topic " + log.topicName, e) e match { diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala new file mode 100644 index 0000000..891402a --- /dev/null +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -0,0 +1,126 @@ +package kafka.log + +import scala.math._ +import kafka.common._ +import kafka.message._ +import kafka.utils.Range +import kafka.utils.Time +import kafka.utils.nonthreadsafe + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + */ +@nonthreadsafe +class LogSegment(val messageSet: FileMessageSet, + val index: OffsetIndex, + val start: Long, + val indexIntervalBytes: Int, + time: Time) extends Range { + + var firstAppendTime: Option[Long] = None + + var bytesSinceLastIndexEntry = 0 + + @volatile var deleted = false + + /* Return the size in bytes of this log segment */ + def size: Long = messageSet.sizeInBytes() + + def updateFirstAppendTime() { + if (firstAppendTime.isEmpty) + firstAppendTime = Some(time.milliseconds) + } + + /** + * Append the given messages starting with the given offset. Add + * an entry to the index if needed. + * + * It is assumed this method is being called from within a lock + */ + def append(offset: Long, messages: ByteBufferMessageSet) { + if (messages.sizeInBytes > 0) { + // append an entry to the index (if needed) + if(bytesSinceLastIndexEntry > indexIntervalBytes) { + index.append(offset, messageSet.sizeInBytes().toInt) + this.bytesSinceLastIndexEntry = 0 + } + // append the messages + messageSet.append(messages) + updateFirstAppendTime() + } + } + + /** + * Find the physical file position for the least offset >= the given offset. If no offset is found + * that meets this criteria before the end of the log, return null. + */ + def translateOffset(offset: Long): OffsetPosition = { + val mapping = index.lookup(offset) + messageSet.searchFor(offset, mapping.position) + } + + /** + * Read a message set from this segment beginning with the first offset + * greater than or equal to the startOffset. The message set will include + * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. + */ + def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { + if(maxSize <= 0) + return MessageSet.Empty + + val startPosition = translateOffset(startOffset) + + // if the start position is already off the end of the log, return MessageSet.Empty + if(startPosition == null) + return MessageSet.Empty + + // calculate the length of the message set to read based on whether or not they gave us a maxOffset + val length = + maxOffset match { + case None => + // no max offset, just use the max size they gave unmolested + maxSize + case Some(offset) => { + // there is a max offset, translate it to a file position and use that to calculate the max read size + val mapping = translateOffset(offset) + val endPosition = + if(mapping == null) + messageSet.sizeInBytes().toInt // the max offset is off the end of the log, use the end of the file + else + mapping.position + min(endPosition - startPosition.position, maxSize) + } + } + messageSet.read(startPosition.position, length) + } + + override def toString() = "LogSegment(start=" + start + ", size=" + size + ")" + + /** + * Truncate off all index and log entries with offsets greater than or equal to the current offset. + */ + def truncateTo(offset: Long) { + val mapping = translateOffset(offset) + if(mapping == null) + return + index.truncateTo(offset) + messageSet.truncateTo(mapping.position) + } + + /** + * Calculate the offset that would be used for the next message to be append to this segment. + * Not that this is expensive. + */ + def nextOffset(): Long = { + val ms = read(index.lastOffset, messageSet.sizeInBytes.toInt, None) + ms.lastOption match { + case None => start + case Some(last) => last.nextOffset + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala new file mode 100644 index 0000000..6f4786b --- /dev/null +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import scala.math._ +import java.io._ +import java.nio._ +import java.nio.channels._ +import java.util.concurrent.atomic._ +import kafka.utils._ + +/** + * An index that maps logical offsets to physical file locations for a particular log segment. This index may be sparse: + * that is it may not hold an entry for all messages in the log. + * + * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. + * + * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant + * to locate the offset/location pair for the greatest offset less than or equal to the target offset. + * + * Index files can be opened in two ways: either an empty, mutable index can be created to that will allow appends or else + * an immutable read-only index file. The makeReadOnly method will turn a mutable file into an immutable one and truncate off + * any extra bytes. This is done when the index file is rolled over. + * + * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. + * + * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the + * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, + * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use + * only 4 bytes for the offset. + * + * The frequency of entries is up to the user of this class. + * + * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal + * storage format. + */ +class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, val maxEntries: Int = -1) extends Logging { + private var size = new AtomicInteger(0) + var lastOffset = -1L + private val mmap: MappedByteBuffer = + { + val raf = new RandomAccessFile(file, "rw") + try { + if(mutable) { + file.createNewFile() + raf.setLength(maxEntries * 8) + raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()) + } else { + val len = raf.length() + if(len < 0 || len % 8 != 0) + throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + val idx = raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len) + idx.position(idx.limit).asInstanceOf[MappedByteBuffer] + } + } finally { + Utils.swallow(raf.close()) + } + } + + if(mutable) { + this.lastOffset = baseOffset + } else { + this.size.set(mmap.limit / 8) + this.lastOffset = readLastOffset + } + + /** + * The last logical offset written to the index + */ + private def readLastOffset(): Long = { + val offset = + size.get match { + case 0 => 0 + case s => logical(this.mmap, s-1) + } + baseOffset + offset + } + + /** + * Find the largest logical offset less than or equal to the given targetOffset + * and return a pair holding this logical offset and it's corresponding physical file position. + * If the target offset is smaller than the least entry in the index (or the index is empty), + * the pair (baseOffset, 0) is returned. + */ + def lookup(targetOffset: Long): OffsetPosition = { + if(entries == 0) + return OffsetPosition(baseOffset, 0) + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + logical(idx, slot), physical(idx, slot)) + } + + private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { + // we only store the difference from the baseoffset so calculate that + val relativeOffset = targetOffset - baseOffset + + // check if the target offset is smaller than the least offset + if(logical(idx, 0) > relativeOffset) + return -1 + + // binary search for the entry + var lo = 0 + var hi = entries-1 + while(lo < hi) { + val mid = ceil((hi + lo) / 2.0).toInt + val found = logical(idx, mid) + if(found == relativeOffset) + return mid + else if(found < relativeOffset) + lo = mid + else + hi = mid - 1 + } + return lo + } + + /* return the nth logical offset relative to the base offset */ + private def logical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) + + /* return the nth physical offset */ + private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) + + /** + * Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. + */ + def append(logicalOffset: Long, position: Int) { + this synchronized { + if(!mutable) + throw new IllegalStateException("Attempt to append to an immutable offset index " + file.getName) + if(isFull) + throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").") + if(size.get > 0 && logicalOffset <= lastOffset) + throw new IllegalArgumentException("Attempt to append an offset (" + logicalOffset + ") no larger than the last offset appended (" + lastOffset + ").") + debug("Adding index entry %d => %d to %s.".format(logicalOffset, position, file.getName)) + this.mmap.putInt((logicalOffset - baseOffset).toInt) + this.mmap.putInt(position) + this.size.incrementAndGet() + this.lastOffset = logicalOffset + } + } + + /** + * True iff there are no more slots available in this index + */ + def isFull = entries >= this.maxEntries + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset + */ + def truncateTo(offset: Long) { + this synchronized { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, offset) + if(slot > 0) { + val found = logical(idx, slot) + val newEntries = if(found == offset) slot else slot + 1 + this.size.set(newEntries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } + } + } + + /** + * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes + */ + def makeReadOnly() { + this synchronized { + mutable = false + flush() + val raf = new RandomAccessFile(file, "rw") + try { + raf.setLength(entries * 8) + } finally { + raf.close() + } + } + } + + /** + * Flush the data in the index to disk + */ + def flush() { + this synchronized { + mmap.force() + } + } + + /** + * Delete this index file + */ + def delete(): Boolean = { + this.file.delete() + } + + /** The number of entries in this index */ + def entries() = size.get + + /** Close the index */ + def close() { + if(mutable) + makeReadOnly() + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala new file mode 100644 index 0000000..6cefde4 --- /dev/null +++ b/core/src/main/scala/kafka/log/OffsetPosition.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +/** + * The mapping between a logical log offset and the physical position + * in some log file of the beginning of the message set entry with the + * given offset. + */ +case class OffsetPosition(val offset: Long, val position: Int) \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/SegmentList.scala b/core/src/main/scala/kafka/log/SegmentList.scala index 9790889..c8e913e 100644 --- a/core/src/main/scala/kafka/log/SegmentList.scala +++ b/core/src/main/scala/kafka/log/SegmentList.scala @@ -78,8 +78,8 @@ private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) { * Delete the items from position newEnd until end of list */ def truncLast(newEnd: Int): Seq[T] = { - if(newEnd >= contents.get().size-1) - throw new KafkaException("End index must be segment list size - 1"); + if(newEnd >= contents.get().size) + throw new KafkaException("Attempt to truncate segment list of length %d to %d.".format(contents.get().size, newEnd)); var deleted: Array[T] = null var done = false while(!done) { diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 99ac008..1f61922 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -21,38 +21,66 @@ import scala.reflect.BeanProperty import kafka.utils.Logging import java.nio.ByteBuffer import java.nio.channels._ +import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, IOException} +import java.util.zip._ +import java.util.concurrent.atomic.AtomicLong import kafka.utils.IteratorTemplate import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException} object ByteBufferMessageSet { - private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { - buffer.putLong(offset) - buffer.putInt(message.size) - buffer.put(message.buffer) - message.buffer.rewind() - } - - private def serialize(baseOffset: Long, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { + private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { if(messages.size == 0) { return MessageSet.Empty.buffer } else if(compressionCodec == NoCompressionCodec) { val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - var offset = baseOffset - for(message <- messages) { - writeMessage(buffer, message, offset) - offset += 12 + message.size - } + for(message <- messages) + writeMessage(buffer, message, offsetCounter.getAndIncrement) buffer.rewind() buffer - } else { - val m = CompressionUtils.compress(baseOffset, messages, compressionCodec) - val buffer = ByteBuffer.allocate(m.size + MessageSet.LogOverhead) - writeMessage(buffer, m, baseOffset) + } else { + val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages)) + val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream)) + var offset = -1L + for(message <- messages) { + offset = offsetCounter.getAndIncrement + output.writeLong(offset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + } + output.close() + + val bytes = byteArrayStream.toByteArray + val message = new Message(bytes, compressionCodec) + val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead) + writeMessage(buffer, message, offset) buffer.rewind() buffer } } + + def decompress(message: Message): ByteBufferMessageSet = { + val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream + val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload) + val intermediateBuffer = new Array[Byte](1024) + val compressed = CompressionFactory(message.compressionCodec, inputStream) + Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => + outputStream.write(intermediateBuffer, 0, dataRead) + } + compressed.close() + + val outputBuffer = ByteBuffer.allocate(outputStream.size) + outputBuffer.put(outputStream.toByteArray) + outputBuffer.rewind + new ByteBufferMessageSet(outputBuffer) + } + + private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { + buffer.putLong(offset) + buffer.putInt(message.size) + buffer.put(message.buffer) + message.buffer.rewind() + } } /** @@ -71,15 +99,15 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue) def this(compressionCodec: CompressionCodec, messages: Message*) { - this(ByteBufferMessageSet.serialize(0, compressionCodec, messages:_*)) + this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*)) } - def this(compressionCodec: CompressionCodec, initialOffset: Long, messages: Message*) { - this(ByteBufferMessageSet.serialize(initialOffset, compressionCodec, messages:_*)) + def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) { + this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*)) } def this(messages: Message*) { - this(NoCompressionCodec, 0, messages: _*) + this(NoCompressionCodec, new AtomicLong(0), messages: _*) } private def shallowValidBytes: Long = { @@ -88,7 +116,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message val iter = this.internalIterator(true) while(iter.hasNext) { val messageAndOffset = iter.next - shallowValidByteCount += 12 + messageAndOffset.message.size + shallowValidByteCount += MessageSet.entrySize(messageAndOffset.message) } } shallowValidByteCount @@ -143,7 +171,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message innerIter = null new MessageAndOffset(newMessage, offset) case _ => - innerIter = CompressionUtils.decompress(newMessage).internalIterator() + innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator() if(!innerIter.hasNext) innerIter = null makeNext() @@ -169,14 +197,14 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message * Update the offsets for this message set. This method attempts to do an in-place conversion * if there is no compression, but otherwise recopies the messages */ - private[kafka] def assignOffsets(firstOffset: Long, codec: CompressionCodec): ByteBufferMessageSet = { + private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec): ByteBufferMessageSet = { if(codec == NoCompressionCodec) { // do an in-place conversion var position = 0 buffer.mark() while(position < sizeInBytes - MessageSet.LogOverhead) { buffer.position(position) - buffer.putLong(firstOffset + position) + buffer.putLong(offsetCounter.getAndIncrement()) position += MessageSet.LogOverhead + buffer.getInt() } buffer.reset() @@ -184,7 +212,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message } else { // messages are compressed, crack open the messageset and recompress with correct offset val messages = this.internalIterator(isShallow = false).map(_.message) - new ByteBufferMessageSet(compressionCodec = codec, initialOffset = firstOffset, messages = messages.toBuffer:_*) + new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*) } } diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala new file mode 100644 index 0000000..53cfdcf --- /dev/null +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.message + +import java.io.OutputStream +import java.io.ByteArrayOutputStream +import java.util.zip.GZIPOutputStream +import java.util.zip.GZIPInputStream +import java.io.IOException +import java.io.InputStream +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong +import kafka.utils._ + +object CompressionFactory { + + def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = { + compressionCodec match { + case DefaultCompressionCodec => new GZIPOutputStream(stream) + case GZIPCompressionCodec => new GZIPOutputStream(stream) + case SnappyCompressionCodec => + import org.xerial.snappy.SnappyOutputStream + new SnappyOutputStream(stream) + case _ => + throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) + } + } + + def apply(compressionCodec: CompressionCodec, stream: InputStream): InputStream = { + compressionCodec match { + case DefaultCompressionCodec => new GZIPInputStream(stream) + case GZIPCompressionCodec => new GZIPInputStream(stream) + case SnappyCompressionCodec => + import org.xerial.snappy.SnappyInputStream + new SnappyInputStream(stream) + case _ => + throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) + } + } +} diff --git a/core/src/main/scala/kafka/message/CompressionUtils.scala b/core/src/main/scala/kafka/message/CompressionUtils.scala deleted file mode 100644 index 90856d5..0000000 --- a/core/src/main/scala/kafka/message/CompressionUtils.scala +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.message - -import java.io.ByteArrayOutputStream -import java.io.IOException -import java.io.InputStream -import java.nio.ByteBuffer -import kafka.utils._ - -abstract sealed class CompressionFacade(inputStream: InputStream, outputStream: ByteArrayOutputStream) { - def close() = { - if (inputStream != null) inputStream.close() - if (outputStream != null) outputStream.close() - } - def read(a: Array[Byte]): Int - def write(a: Array[Byte]) -} - -class GZIPCompression(inputStream: InputStream, outputStream: ByteArrayOutputStream) extends CompressionFacade(inputStream,outputStream) { - import java.util.zip.GZIPInputStream - import java.util.zip.GZIPOutputStream - val gzipIn:GZIPInputStream = if (inputStream == null) null else new GZIPInputStream(inputStream) - val gzipOut:GZIPOutputStream = if (outputStream == null) null else new GZIPOutputStream(outputStream) - - override def close() { - if (gzipIn != null) gzipIn.close() - if (gzipOut != null) gzipOut.close() - super.close() - } - - override def write(a: Array[Byte]) = { - gzipOut.write(a) - } - - override def read(a: Array[Byte]): Int = { - gzipIn.read(a) - } -} - -class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream) extends CompressionFacade(inputStream,outputStream) { - import org.xerial.snappy.SnappyInputStream - import org.xerial.snappy.SnappyOutputStream - - val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream) - val snappyOut:SnappyOutputStream = if (outputStream == null) null else new SnappyOutputStream(outputStream) - - override def close() = { - if (snappyIn != null) snappyIn.close() - if (snappyOut != null) snappyOut.close() - super.close() - } - - override def write(a: Array[Byte]) = { - snappyOut.write(a) - } - - override def read(a: Array[Byte]): Int = { - snappyIn.read(a) - } - -} - -object CompressionFactory { - def apply(compressionCodec: CompressionCodec, stream: ByteArrayOutputStream): CompressionFacade = compressionCodec match { - case GZIPCompressionCodec => new GZIPCompression(null,stream) - case SnappyCompressionCodec => new SnappyCompression(null,stream) - case _ => - throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) - } - def apply(compressionCodec: CompressionCodec, stream: InputStream): CompressionFacade = compressionCodec match { - case GZIPCompressionCodec => new GZIPCompression(stream,null) - case SnappyCompressionCodec => new SnappyCompression(stream,null) - case _ => - throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) - } -} - -object CompressionUtils extends Logging{ - - //specify the codec which is the default when DefaultCompressionCodec is used - private var defaultCodec: CompressionCodec = GZIPCompressionCodec - - def compress(baseOffset: Long, messages: Iterable[Message], compressionCodec: CompressionCodec = DefaultCompressionCodec): Message = { - val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream() - - debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) - - var cf: CompressionFacade = null - if (compressionCodec == DefaultCompressionCodec) - cf = CompressionFactory(defaultCodec, outputStream) - else - cf = CompressionFactory(compressionCodec, outputStream) - - val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for(message <- messages){ - messageByteBuffer.putLong(baseOffset) - messageByteBuffer.putInt(message.size) - messageByteBuffer.put(message.buffer) - message.buffer.rewind() - } - messageByteBuffer.rewind - - try { - cf.write(messageByteBuffer.array) - } catch { - case e: IOException => error("Error while writing to the GZIP output stream", e) - throw e - } finally { - cf.close() - } - - val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec) - oneCompressedMessage - } - - def decompress(message: Message): ByteBufferMessageSet = { - val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream - val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload) - - val intermediateBuffer = new Array[Byte](1024) - - var cf: CompressionFacade = null - if (message.compressionCodec == DefaultCompressionCodec) - cf = CompressionFactory(defaultCodec, inputStream) - else - cf = CompressionFactory(message.compressionCodec, inputStream) - - try { - Stream.continually(cf.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => - outputStream.write(intermediateBuffer, 0, dataRead) - } - }catch { - case e: IOException => error("Error while reading from the GZIP input stream", e) - throw e - } finally { - cf.close() - } - - val outputBuffer = ByteBuffer.allocate(outputStream.size) - outputBuffer.put(outputStream.toByteArray) - outputBuffer.rewind - val outputByteArray = outputStream.toByteArray - new ByteBufferMessageSet(outputBuffer) - } -} diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala deleted file mode 100644 index 6eaa306..0000000 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.message - -import java.io._ -import java.nio._ -import java.nio.channels._ -import java.util.concurrent.atomic._ - -import kafka.utils._ -import kafka.common.KafkaException -import java.util.concurrent.TimeUnit -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} - -/** - * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts - * will fail on an immutable message set. An optional limit and offset can be applied to the message set - * which will control the offset into the file and the effective length into the file from which - * messages will be read - */ -@nonthreadsafe -class FileMessageSet private[kafka](private[message] val channel: FileChannel, - private[message] val offset: Long, - private[message] val limit: Long, - val mutable: Boolean, - val needRecover: AtomicBoolean) extends MessageSet with Logging { - - private val setSize = new AtomicLong() - - if(mutable) { - if(limit < Long.MaxValue || offset > 0) - throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.") - - if (needRecover.get) { - // set the file position to the end of the file for appending messages - val startMs = System.currentTimeMillis - val truncated = recover() - info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 + - " seconds. " + truncated + " bytes truncated. New log size is " + sizeInBytes() + " bytes") - } - else { - setSize.set(channel.size()) - channel.position(channel.size) - } - } else { - setSize.set(scala.math.min(channel.size(), limit) - offset) - } - - /** - * Create a file message set with no limit or offset - */ - def this(channel: FileChannel, mutable: Boolean) = - this(channel, 0, Long.MaxValue, mutable, new AtomicBoolean(false)) - - /** - * Create a file message set with no limit or offset - */ - def this(file: File, mutable: Boolean) = - this(Utils.openChannel(file, mutable), mutable) - - /** - * Create a file message set with no limit or offset - */ - def this(channel: FileChannel, mutable: Boolean, needRecover: AtomicBoolean) = - this(channel, 0, Long.MaxValue, mutable, needRecover) - - /** - * Create a file message set with no limit or offset - */ - def this(file: File, mutable: Boolean, needRecover: AtomicBoolean) = - this(Utils.openChannel(file, mutable), mutable, needRecover) - - - /** - * Return a message set which is a view into this set starting from the given offset and with the given size limit. - */ - def read(readOffset: Long, size: Long): FileMessageSet = { - new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, sizeInBytes()), - false, new AtomicBoolean(false)) - } - - /** - * Write some of this set to the given channel, return the ammount written - */ - def writeTo(destChannel: GatheringByteChannel, writeOffset: Long, size: Long): Long = - channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel) - - /** - * Get an iterator over the messages in the set - */ - override def iterator: Iterator[MessageAndOffset] = { - new IteratorTemplate[MessageAndOffset] { - var location = offset - - override def makeNext(): MessageAndOffset = { - // read the size of the item - val sizeOffsetBuffer = ByteBuffer.allocate(12) - channel.read(sizeOffsetBuffer, location) - if(sizeOffsetBuffer.hasRemaining) - return allDone() - - sizeOffsetBuffer.rewind() - val offset = sizeOffsetBuffer.getLong() - val size = sizeOffsetBuffer.getInt() - if (size < Message.MinHeaderSize) - return allDone() - - // read the item itself - val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + 12) - if(buffer.hasRemaining) - return allDone() - buffer.rewind() - - // increment the location and return the item - location += size + 12 - new MessageAndOffset(new Message(buffer), offset) - } - } - } - - /** - * The number of bytes taken up by this file set - */ - def sizeInBytes(): Long = setSize.get() - - def checkMutable(): Unit = { - if(!mutable) - throw new KafkaException("Attempt to invoke mutation on immutable message set.") - } - - /** - * Append this message to the message set - */ - def append(messages: MessageSet): Unit = { - checkMutable() - var written = 0L - while(written < messages.sizeInBytes) - written += messages.writeTo(channel, 0, messages.sizeInBytes) - setSize.getAndAdd(written) - } - - /** - * Commit all written data to the physical disk - */ - def flush() = { - checkMutable() - LogFlushStats.logFlushTimer.time { - channel.force(true) - } - } - - /** - * Close this message set - */ - def close() = { - if(mutable) - flush() - channel.close() - } - - /** - * Recover log up to the last complete entry. Truncate off any bytes from any incomplete messages written - */ - def recover(): Long = { - checkMutable() - val len = channel.size - val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) - var validUpTo: Long = 0 - var next = 0L - do { - next = validateMessage(channel, validUpTo, len, buffer) - if(next >= 0) - validUpTo = next - } while(next >= 0) - channel.truncate(validUpTo) - setSize.set(validUpTo) - /* This should not be necessary, but fixes bug 6191269 on some OSs. */ - channel.position(validUpTo) - needRecover.set(false) - len - validUpTo - } - - def truncateTo(targetSize: Long) = { - if(targetSize >= sizeInBytes()) - throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + - " size of this log segment is only %d bytes".format(sizeInBytes())) - channel.truncate(targetSize) - setSize.set(targetSize) - } - - /** - * Read, validate, and discard a single message, returning the next valid offset, and - * the message being validated - */ - private def validateMessage(channel: FileChannel, start: Long, len: Long, buffer: ByteBuffer): Long = { - buffer.rewind() - var read = channel.read(buffer, start) - if(read < MessageSet.LogOverhead) - return -1 - - // check that we have sufficient bytes left in the file - val offset = buffer.getLong(0) - val size = buffer.getInt(8) - if(size < Message.MinHeaderSize) - return -1 - - val next = start + MessageSet.LogOverhead + size - if(next > len) - return -1 - - // read the message - val messageBuffer = ByteBuffer.allocate(size) - var curr = start + MessageSet.LogOverhead - while(messageBuffer.hasRemaining) { - read = channel.read(messageBuffer, curr) - if(read < 0) - throw new KafkaException("File size changed during recovery!") - else - curr += read - } - messageBuffer.rewind() - val message = new Message(messageBuffer) - if(!message.isValid) - return -1 - else - next - } -} - -object LogFlushStats extends KafkaMetricsGroup { - val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) -} diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index c04d475..e559bdb 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -20,7 +20,6 @@ package kafka.message import java.nio._ import scala.math._ import kafka.utils._ -import kafka.common.UnknownMagicByteException /** * Constants related to messages @@ -71,24 +70,34 @@ object Message { * 2. 1 byte "magic" identifier to allow format changes, value is 2 currently * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * 4. 4 byte key length, containing length K - * 5. K byte key key - * 6. (N - K - 6) byte payload + * 5. K byte key + * 6. (N - K - 10) byte payload + * + * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. */ class Message(val buffer: ByteBuffer) { import kafka.message.Message._ + /** + * A constructor to create a Message + * @param bytes The payload of the message + * @param compressionCodec The compression codec used on the contents of the message (if any) + * @param key The key of the message (null, if none) + * @param payloadOffset The offset into the payload array used to extract payload + * @param payloadSize The size of the payload to use + */ def this(bytes: Array[Byte], compressionCodec: CompressionCodec = NoCompressionCodec, key: Array[Byte] = null, - offset: Int = 0, - size: Int = -1) = { + payloadOffset: Int = 0, + payloadSize: Int = -1) = { this(ByteBuffer.allocate(Message.CrcLength + Message.MagicLength + Message.AttributesLength + Message.KeySizeLength + (if(key == null) 0 else key.length) + - (if(size >= 0) size else bytes.length - offset))) + (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset))) // skip crc, we will fill that in at the end buffer.put(MagicOffset, CurrentMagicValue) var attributes:Byte = 0 @@ -103,7 +112,7 @@ class Message(val buffer: ByteBuffer) { buffer.position(KeyOffset) buffer.put(key, 0, key.length) } - buffer.put(bytes, offset, if(size >= 0) size else bytes.length - offset) + buffer.put(bytes, payloadOffset, if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset) buffer.rewind() // now compute the checksum and fill it in @@ -120,7 +129,7 @@ class Message(val buffer: ByteBuffer) { * Compute the checksum of the message from the message contents */ def computeChecksum(): Long = - Utils.crc32(buffer.array, buffer.arrayOffset + CrcOffset + CrcLength, buffer.limit - CrcOffset - CrcLength) + Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** * Retrieve the previously computed CRC for this message @@ -153,7 +162,7 @@ class Message(val buffer: ByteBuffer) { /** * Does the message have a key? */ - def hasKey: Boolean = buffer.getInt(Message.KeySizeOffset) >= 0 + def hasKey: Boolean = keySize >= 0 /** * The length of the message value in bytes diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala index dcf00bb..51edf9f 100644 --- a/core/src/main/scala/kafka/message/MessageAndOffset.scala +++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala @@ -23,6 +23,6 @@ case class MessageAndOffset(message: Message, offset: Long) { /** * Compute the offset of the next message in the log */ - def nextOffset: Long = offset + MessageSet.entrySize(message) + def nextOffset: Long = offset + 1 } diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index c47bf48..8c75bfd 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -58,7 +58,7 @@ object MessageSet { /** * A set of messages with offsets. A message set has a fixed serialized form, though the container - * for the bytes could be either in-memory or on disk. A The format of each message is + * for the bytes could be either in-memory or on disk. The format of each message is * as follows: * 8 byte message offset number * 4 byte size containing an integer N diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 9fd460d..4b45e37 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -22,6 +22,7 @@ import kafka.consumer.SimpleConsumer import kafka.common.ErrorMapping import collection.mutable import kafka.message.ByteBufferMessageSet +import kafka.message.MessageAndOffset import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -100,8 +101,12 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket partitionData.error match { case ErrorMapping.NoError => processPartitionData(topic, currentOffset.get, partitionData) - val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes - val newOffset = currentOffset.get + validBytes + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentOffset.get + } fetchMap.put(key, newOffset) FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset fetcherMetrics.byteRate.mark(validBytes) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8f7913a..79025e3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -30,6 +30,7 @@ import mutable.HashMap import scala.math._ import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient @@ -99,7 +100,7 @@ class KafkaApis(val requestChannel: RequestChannel, def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { var satisfied = new mutable.ArrayBuffer[DelayedFetch] for(partitionData <- partitionDatas) - satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) + satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), partitionData) trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size)) // send any newly unblocked responses for(fetchReq <- satisfied) { @@ -138,10 +139,12 @@ class KafkaApis(val requestChannel: RequestChannel, }) }) - val delayedProduce = new DelayedProduce( - producerRequestKeys, request, - response.errors, response.offsets, - produceRequest, produceRequest.ackTimeoutMs.toLong) + val delayedProduce = new DelayedProduce(producerRequestKeys, + request, + response.errors, + response.offsets, + produceRequest, + produceRequest.ackTimeoutMs.toLong) producerRequestPurgatory.watch(delayedProduce) /* @@ -177,10 +180,10 @@ class KafkaApis(val requestChannel: RequestChannel, try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val log = localReplica.log.get - log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + val offset = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) - offsets(msgIndex) = log.logEndOffset + offsets(msgIndex) = offset errors(msgIndex) = ErrorMapping.NoError.toShort trace("%d bytes written to logs, nextAppendOffset = %d" .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) @@ -223,7 +226,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if(fetchRequest.isFromFollower) { - maybeUpdatePartitionHW(fetchRequest) + maybeUpdatePartitionHw(fetchRequest) // after updating HW, some delayed produce requests may be unblocked var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] fetchRequest.offsetInfo.foreach(topicOffsetInfo => { @@ -236,57 +239,25 @@ class KafkaApis(val requestChannel: RequestChannel, satisfiedProduceRequests.foreach(_.respond()) } - // if there are enough bytes available right now we can answer the request, otherwise we have to punt - val availableBytes = availableFetchBytes(fetchRequest) + val topicDatas = readMessageSets(fetchRequest) + val bytesReadable = topicDatas.flatMap(_.partitionDataArray.map(_.messages.sizeInBytes)).sum if(fetchRequest.maxWait <= 0 || - availableBytes >= fetchRequest.minBytes || + bytesReadable >= fetchRequest.minBytes || fetchRequest.numPartitions <= 0) { - val topicData = readMessageSets(fetchRequest) debug("Returning fetch response %s for fetch request with correlation id %d".format( - topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) + topicDatas.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) + val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicDatas) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request into purgatory") // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _))) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait) + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) fetchRequestPurgatory.watch(delayedFetch) } } - /** - * Calculate the number of available bytes for the given fetch request - */ - private def availableFetchBytes(fetchRequest: FetchRequest): Long = { - var totalBytes = 0L - for(offsetDetail <- fetchRequest.offsetInfo) { - for(i <- 0 until offsetDetail.partitions.size) { - debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) - try { - val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) - val end = if (!fetchRequest.isFromFollower) { - leader.highWatermark - } else { - leader.logEndOffset - } - val available = max(0, end - offsetDetail.offsets(i)) - totalBytes += math.min(offsetDetail.fetchSizes(i), available) - } catch { - case e: UnknownTopicOrPartitionException => - info("Invalid partition %d in fetch request from client %s." - .format(offsetDetail.partitions(i), fetchRequest.clientId)) - case e => - error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" - .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e) - } - } - } - trace(totalBytes + " available bytes for fetch request.") - totalBytes - } - - private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) { + private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) { val offsets = fetchRequest.offsetInfo debug("Act on update partition HW, check offset detail: %s ".format(offsets)) for(offsetDetail <- offsets) { @@ -344,19 +315,20 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, partition: Int, offset: Long, - maxSize: Int, fromFollower: Boolean): (MessageSet, Long) = { + private def readMessageSet(topic: String, + partition: Int, + offset: Long, + maxSize: Int, + fromFollower: Boolean): (MessageSet, Long) = { // check if the current broker is the leader for the partitions val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val actualSize = if (!fromFollower) { - min(leader.highWatermark - offset, maxSize).toInt - } else { - maxSize - } val messages = leader.log match { case Some(log) => - log.read(offset, actualSize) + if(fromFollower) + log.read(startOffset = offset, maxLength = maxSize, maxOffset = None) + else + log.read(startOffset = offset, maxLength = maxSize, maxOffset = Some(leader.highWatermark)) case None => error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId)) MessageSet.Empty @@ -459,21 +431,24 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A delayed fetch request */ - class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long) - extends DelayedRequest(keys, request, delayMs) + class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) + extends DelayedRequest(keys, request, delayMs) { + val bytesAccumulated = new AtomicLong(initialSize) + } /** * A holding pen for fetch requests waiting to be satisfied */ - class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { - + class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData](brokerId) { this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ - def checkSatisfied(n: Null, delayedFetch: DelayedFetch): Boolean = - availableFetchBytes(delayedFetch.fetch) >= delayedFetch.fetch.minBytes + def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = { + val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(partitionData.messages.sizeInBytes) + accumulatedSize >= delayedFetch.fetch.minBytes + } /** * When a request expires just answer it with whatever data is present diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5978d3b..8d6a53f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -96,6 +96,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) + + /* the interval with which we add an entry to the offset index */ + val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 04eef88..8e46e2c 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -113,11 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if(requestHandlerPool != null) requestHandlerPool.shutdown() kafkaScheduler.shutdown() - apis.close() - kafkaZookeeper.shutdown() + if(apis != null) + apis.close() + if(kafkaZookeeper != null) + kafkaZookeeper.shutdown() if(replicaManager != null) replicaManager.shutdown() - if (socketServer != null) + if(socketServer != null) socketServer.shutdown() if(logManager != null) logManager.shutdown() diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 51bfe5e..7f32421 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -38,7 +38,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet) - trace("Follower %d has replica log end offset %d after appending %d messages" + trace("Follower %d has replica log end offset %d after appending %d bytes of messages" .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) replica.highWatermark = followerHighWatermark diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index d837499..a59f780 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -19,6 +19,7 @@ package kafka.tools import java.io._ import kafka.message._ +import kafka.log._ import kafka.utils._ object DumpLogSegments { @@ -39,7 +40,7 @@ object DumpLogSegments { val messageSet = new FileMessageSet(file, false) for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message - println("offset: " + (startOffset + offset) + " isvalid: " + msg.isValid + + println("offset: " + offset + " isvalid: " + msg.isValid + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec) if (!isNoPrint) println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 6257ab9..c6b1eae 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -299,8 +299,7 @@ object ZkUtils extends Logging { (true, stat.getVersion) } catch { case e: Exception => - info("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion)) - error(e) + debug("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion), e) (false, -1) } } diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala index 625f03f..75c33e0 100644 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala @@ -33,7 +33,7 @@ object TestLogPerformance { val props = TestUtils.createBrokerConfig(0, -1) val config = new KafkaConfig(props) val dir = TestUtils.tempDir() - val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, false, SystemTime) + val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime) val bytes = new Array[Byte](messageSize) new java.util.Random().nextBytes(bytes) val message = new Message(bytes) diff --git a/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala b/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala index d1cd847..f0c07c5 100644 --- a/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala @@ -51,7 +51,7 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit producer.send(producerData) // corrupt the file on disk - val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.nameFromOffset(0)) + val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.logFilename(0)) val byteBuffer = ByteBuffer.allocate(4) byteBuffer.putInt(1000) // wrong message size byteBuffer.rewind() diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala new file mode 100644 index 0000000..f06e537 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.nio._ +import java.util.concurrent.atomic._ +import junit.framework.Assert._ +import kafka.utils.TestUtils._ +import kafka.message._ +import org.junit.Test + +class FileMessageSetTest extends BaseMessageSetTestCases { + + val messageSet = createMessageSet(messages) + + def createMessageSet(messages: Seq[Message]): FileMessageSet = { + val set = new FileMessageSet(tempFile(), true) + set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) + set.flush() + set + } + + @Test + def testFileSize() { + assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + messageSet.append(singleMessageSet("abcd".getBytes())) + assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + } + + @Test + def testIterationOverPartialAndTruncation() { + testPartialWrite(0, messageSet) + testPartialWrite(2, messageSet) + testPartialWrite(4, messageSet) + testPartialWrite(5, messageSet) + testPartialWrite(6, messageSet) + } + + def testPartialWrite(size: Int, messageSet: FileMessageSet) { + val buffer = ByteBuffer.allocate(size) + val originalPosition = messageSet.channel.position + for(i <- 0 until size) + buffer.put(0.asInstanceOf[Byte]) + buffer.rewind() + messageSet.channel.write(buffer) + // appending those bytes should not change the contents + checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) + } + + @Test + def testIterationDoesntChangePosition() { + val position = messageSet.channel.position + checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) + assertEquals(position, messageSet.channel.position) + } + + @Test + def testRead() { + val read = messageSet.read(0, messageSet.sizeInBytes) + checkEquals(messageSet.iterator, read.iterator) + val items = read.iterator.toList + val sec = items.tail.head + val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes) + checkEquals(items.tail.iterator, read2.iterator) + } + + @Test + def testSearch() { + // append a new message with a high offset + val lastMessage = new Message("test".getBytes) + messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage)) + var physicalOffset = 0 + assertEquals("Should be able to find the first message by its offset", + OffsetPosition(0L, physicalOffset), + messageSet.searchFor(0, 0)) + physicalOffset += MessageSet.entrySize(messageSet.head.message) + assertEquals("Should be able to find second message when starting from 0", + OffsetPosition(1L, physicalOffset), + messageSet.searchFor(1, 0)) + assertEquals("Should be able to find second message starting from its offset", + OffsetPosition(1L, physicalOffset), + messageSet.searchFor(1, physicalOffset)) + physicalOffset += MessageSet.entrySize(messageSet.tail.head.message) + assertEquals("Should be able to find third message from a non-existant offset", + OffsetPosition(50L, physicalOffset), + messageSet.searchFor(3, physicalOffset)) + assertEquals("Should be able to find third message by correct offset", + OffsetPosition(50L, physicalOffset), + messageSet.searchFor(50, physicalOffset)) + } + +} diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6bf7221..8679009 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -27,7 +27,7 @@ import kafka.admin.CreateTopicCommand import kafka.server.KafkaConfig import kafka.utils._ -class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { +class LogManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val maxRollInterval = 100 @@ -35,15 +35,13 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { var logDir: File = null var logManager: LogManager = null var config:KafkaConfig = null - val zookeeperConnect = TestZKUtils.zookeeperConnect val name = "kafka" val veryLargeLogFlushInterval = 10000000L val scheduler = new KafkaScheduler(2) override def setUp() { super.setUp() - val props = TestUtils.createBrokerConfig(0, -1) - config = new KafkaConfig(props) { + config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { override val logFileSize = 1024 override val flushInterval = 100 } @@ -51,11 +49,6 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup logDir = logManager.logDir - - TestUtils.createBrokersInZk(zkClient, List(config.brokerId)) - - // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0") } override def tearDown() { @@ -87,8 +80,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { var offset = 0L for(i <- 0 until 1000) { var set = TestUtils.singleMessageSet("test".getBytes()) - log.append(set) - offset += set.sizeInBytes + offset = log.append(set) } log.flush @@ -96,12 +88,12 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { // update the last modified time of all log segments val logSegments = log.segments.view - logSegments.foreach(s => s.file.setLastModified(time.currentMs)) + logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs)) time.currentMs += maxLogAge + 3000 logManager.cleanupLogs() assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) - assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0L, log.read(offset+1, 1024).sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") @@ -135,8 +127,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { // add a bunch of messages that should be larger than the retentionSize for(i <- 0 until 1000) { val set = TestUtils.singleMessageSet("test".getBytes()) - log.append(set) - offset += set.sizeInBytes + offset = log.append(set) } // flush to make sure it's written to disk log.flush @@ -147,7 +138,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { // this cleanup shouldn't find any expired segments but should delete some to reduce size logManager.cleanupLogs() assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) - assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0L, log.read(offset + 1, 1024).sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") @@ -175,8 +166,8 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - + println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime) assertTrue("The last flush time has to be within defaultflushInterval of current time ", - (System.currentTimeMillis - log.getLastFlushedTime) < 100) + (System.currentTimeMillis - log.getLastFlushedTime) < 150) } } diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index b825b27..1458f1d 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -95,12 +95,12 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10) val offsets = log.getOffsetsBefore(offsetRequest).toList - assertEquals(List(480, 360, 240, 120, 0), offsets) + assertEquals(List(20, 15, 10, 5, 0), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, OffsetRequest.LatestTime, 10).toList - assertEquals(List(480, 360, 240, 120, 0), consumerOffsets) + assertEquals(List(20, 15, 10, 5, 0), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -154,11 +154,11 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsetRequest = new OffsetRequest(topic, part, now, 10) val offsets = log.getOffsetsBefore(offsetRequest).toList - assertEquals(List(480L, 360L, 240L, 120L, 0L), offsets) + assertEquals(List(20, 15, 10, 5, 0), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10).toList - assertEquals(List(480L, 360L, 240L, 120L, 0L), consumerOffsets) + assertEquals(List(20, 15, 10, 5, 0), consumerOffsets) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala new file mode 100644 index 0000000..6383a90 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -0,0 +1,104 @@ +package kafka.log + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, Before, After} +import org.scalatest.junit.JUnit3Suite +import kafka.utils.TestUtils +import kafka.message._ +import kafka.utils.SystemTime +import scala.collection._ + +class LogSegmentTest extends JUnit3Suite { + + val segments = mutable.ArrayBuffer[LogSegment]() + + def createSegment(offset: Long): LogSegment = { + val msFile = TestUtils.tempFile() + val ms = new FileMessageSet(msFile, true) + val idxFile = TestUtils.tempFile() + val idx = new OffsetIndex(idxFile, offset, true, 100) + val seg = new LogSegment(ms, idx, offset, 10, SystemTime) + segments += seg + seg + } + + def messages(offset: Long, messages: String*): ByteBufferMessageSet = { + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, + offsetCounter = new AtomicLong(offset), + messages = messages.map(s => new Message(s.getBytes)):_*) + } + + @After + def teardown() { + for(seg <- segments) { + seg.index.delete() + seg.messageSet.delete() + } + } + + @Test + def testReadOnEmptySegment() { + val seg = createSegment(40) + val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None) + assertEquals(0, read.size) + } + + @Test + def testReadBeforeFirstOffset() { + val seg = createSegment(40) + val ms = messages(50, "hello", "there", "little", "bee") + seg.append(50, ms) + val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None) + assertEquals(ms.toList, read.toList) + } + + @Test + def testReadSingleMessage() { + val seg = createSegment(40) + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50)) + assertEquals(new Message("hello".getBytes), read.head.message) + } + + @Test + def testReadAfterLast() { + val seg = createSegment(40) + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) + assertEquals(0, read.size) + } + + @Test + def testReadFromGap() { + val seg = createSegment(40) + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val ms2 = messages(60, "alpha", "beta") + seg.append(60, ms2) + val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, read.toList) + } + + @Test + def testTruncate() { + val seg = createSegment(40) + val ms = messages(50, "hello", "there", "you") + seg.append(50, ms) + seg.truncateTo(51) + val read = seg.read(50, maxSize = 1000, None) + assertEquals(1, read.size) + assertEquals(ms.head, read.head) + } + + @Test + def testNextOffsetCalculation() { + val seg = createSegment(40) + assertEquals(40, seg.nextOffset) + seg.append(50, messages(50, "hello", "there", "you")) + assertEquals(53, seg.nextOffset()) + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 24eb5e3..7f160f3 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -46,9 +46,11 @@ class LogTest extends JUnitSuite { Utils.rm(logDir) } - def createEmptyLogs(dir: File, offsets: Int*) = { - for(offset <- offsets) - new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile() + def createEmptyLogs(dir: File, offsets: Int*) { + for(offset <- offsets) { + new File(dir, Log.logFilename(offset)).createNewFile() + new File(dir, Log.indexFilename(offset)).createNewFile() + } } /** Test that the size and time based log segment rollout works. */ @@ -59,7 +61,7 @@ class LogTest extends JUnitSuite { val time: MockTime = new MockTime() // create a log - val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, false, time) + val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time) time.currentMs += rollMs + 1 // segment age is less than its limit @@ -93,7 +95,7 @@ class LogTest extends JUnitSuite { val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages // create a log - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time) + val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -106,23 +108,12 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) - } - - @Test - def testLoadInvalidLogsFails() { - createEmptyLogs(logDir, 0, 15) - try { - new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) - fail("Allowed load of corrupt logs without complaint.") - } catch { - case e: KafkaException => "This is good" - } + new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) } @Test def testAppendAndRead() { - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 10) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) @@ -139,7 +130,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -159,50 +150,33 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 100 - for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) + val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(Integer.toString(i).getBytes())) + val offsets = messageSets.map(log.append(_)) log.flush /* now do successive reads and iterate over the resulting message sets counting the messages * we should find exactly 100 messages. */ var reads = 0 - var current = 0 var offset = 0L var readOffset = 0L - while(current < numMessages) { - val messages = log.read(readOffset, 1024*1024) - readOffset = messages.last.nextOffset - current += messages.size - if(reads > 2*numMessages) - fail("Too many read attempts.") - reads += 1 + for((messageSet, offset) <- messageSets.zip(offsets)) { + val messages = log.read(offset, 1024*1024) + assertEquals("Messages not equal at offset " + offset, messageSet.head.message, messages.head.message) } - assertEquals("We did not find all the messages we put in", numMessages, current) } @Test def testFindSegment() { assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45)) - assertEquals("Search in segment list just outside the range of the last segment should find nothing", - None, Log.findRange(makeRanges(5, 9, 12), 12)) - try { - Log.findRange(makeRanges(35), 36) - fail("expect exception") - } - catch { - case e: OffsetOutOfRangeException => "this is good" - } - - try { - Log.findRange(makeRanges(35,35), 36) - } - catch { - case e: OffsetOutOfRangeException => "this is good" - } - + assertEquals("Search in segment list just outside the range of the last segment should find last segment", + 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start) + assertEquals("Search in segment list far outside the range of the last segment should find last segment", + 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start) + assertEquals("Search in segment list far outside the range of the last segment should find last segment", + None, Log.findRange(makeRanges(5, 9, 12), -1)) assertContains(makeRanges(5, 9, 12), 11) assertContains(makeRanges(5), 4) assertContains(makeRanges(5,8), 5) @@ -214,7 +188,7 @@ class LogTest extends JUnitSuite { def testEdgeLogRolls() { { // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val curOffset = log.logEndOffset assertEquals(curOffset, 0) @@ -227,10 +201,10 @@ class LogTest extends JUnitSuite { { // second test an empty log segment starting at none-zero - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 1 for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) + log.append(TestUtils.singleMessageSet(i.toString.getBytes)) val curOffset = log.logEndOffset // time goes by; the log file is deleted @@ -257,7 +231,7 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, 100, maxMessageSize.toInt, 1000, config.logRollHours*60*60*1000L, false, time) + val log = new Log(logDir, 100, maxMessageSize.toInt, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) // should be able to append the small message log.append(first) @@ -269,6 +243,26 @@ class LogTest extends JUnitSuite { case e:MessageSizeTooLargeException => // this is good } } + + @Test + def testLogRecoversToCorrectOffset() { + val numMessages = 100 + var log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + for(i <- 0 until numMessages) + log.append(TestUtils.singleMessageSet(i.toString.getBytes)) + assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) + log.close() + + // test non-recovery case + log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) + log.close() + + // test + log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, time = time) + assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) + log.close() + } def assertContains(ranges: Array[Range], offset: Long) = { Log.findRange(ranges, offset) match { diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala new file mode 100644 index 0000000..c023816 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io._ +import junit.framework.Assert._ +import java.util.{Collections, Arrays} +import org.junit._ +import org.scalatest.junit.JUnitSuite +import scala.collection._ +import scala.util.Random +import kafka.utils._ + + /* + * Test cases: + * - empty index + * - first value + * - last value + * - non-present value + * - present value + * - random values + * - test immutability + * - test truncate + * - test lookup outside bounds + * - Extreme values in append + * - what value do we return if falls off the end? + */ +class OffsetIndexTest extends JUnitSuite { + + var idx: OffsetIndex = null + val maxEntries = 30 + + @Before + def setup() { + this.idx = new OffsetIndex(file = TestUtils.tempFile(), baseOffset = 45L, mutable = true, maxEntries = 30) + } + + @After + def teardown() { + if(this.idx != null) + this.idx.file.delete() + } + + @Test + def randomLookupTest() { + assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L)) + + // append some random values + val base = idx.baseOffset.toInt + 1 + val size = idx.maxEntries + val vals: Seq[(Long, Int)] = monotonicSeq(base, size).map(_.toLong).zip(monotonicSeq(0, size)) + vals.foreach{x => idx.append(x._1, x._2)} + + // should be able to find all those values + for((logical, physical) <- vals) + assertEquals("Should be able to find values that are present.", OffsetPosition(logical, physical), idx.lookup(logical)) + + // for non-present values we should find the offset of the largest value less than or equal to this + val valMap = new immutable.TreeMap[Long, (Long, Int)]() ++ vals.map(p => (p._1, p)) + val offsets = (idx.baseOffset until vals.last._1.toInt).toArray + Collections.shuffle(Arrays.asList(offsets)) + for(offset <- offsets.take(30)) { + val rightAnswer = + if(offset < valMap.firstKey) + OffsetPosition(idx.baseOffset, 0) + else + OffsetPosition(valMap.to(offset).last._1, valMap.to(offset).last._2._2) + assertEquals("The index should give the same answer as the sorted map", rightAnswer, idx.lookup(offset)) + } + } + + @Test + def lookupExtremeCases() { + assertEquals("Lookup on empty file", OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset)) + for(i <- 0 until idx.maxEntries) + idx.append(idx.baseOffset + i + 1, i) + // check first and last entry + assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset)) + assertEquals(OffsetPosition(idx.baseOffset + idx.maxEntries, idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries)) + } + + @Test + def appendTooMany() { + for(i <- 0 until idx.maxEntries) { + val offset = idx.baseOffset + i + 1 + idx.append(offset, i) + } + assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException]) + } + + + @Test + def testReadOnly() { + /* add some random values */ + val vals = List((49, 1), (52, 2), (55, 3)) + for((logical, physical) <- vals) + idx.append(logical, physical) + + idx.makeReadOnly() + + assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length()) + assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset) + + for((logical, physical) <- vals) + assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical), idx.lookup(logical)) + + assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException]) + } + + @Test(expected = classOf[IllegalArgumentException]) + def appendOutOfOrder() { + idx.append(51, 0) + idx.append(50, 1) + } + + @Test + def reopenAsReadonly() { + val first = OffsetPosition(51, 0) + val sec = OffsetPosition(52, 1) + idx.append(first.offset, first.position) + idx.append(sec.offset, sec.position) + idx.close() + val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false, maxEntries = -1) + assertEquals(first, idxRo.lookup(first.offset)) + assertEquals(sec, idxRo.lookup(sec.offset)) + assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException]) + } + + @Test + def truncate() { + val idx = new OffsetIndex(file = TestUtils.tempFile(), baseOffset = 0L, mutable = true, maxEntries = 10) + for(i <- 1 until 10) + idx.append(i, i) + idx.truncateTo(5) + assertEquals("4 should be the last entry in the index", OffsetPosition(4, 4), idx.lookup(10)) + assertEquals("4 should be the last entry in the index", 4, idx.lastOffset) + } + + def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) { + try { + idx.append(offset, 1) + fail(message) + } catch { + case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass) + } + } + + def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = { + val idx = new OffsetIndex(file = TestUtils.tempFile, baseOffset = baseOffset, mutable = mutable, maxEntries = 2 * vals.size) + for ((logical, physical) <- vals) + idx.append(logical, physical) + idx + } + + def monotonicSeq(base: Int, len: Int): Seq[Int] = { + val rand = new Random(1L) + val vals = new mutable.ArrayBuffer[Int](len) + var last = base + for (i <- 0 until len) { + last += rand.nextInt(15) + 1 + vals += last + } + vals + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index d1f76b3..c436f3d 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -17,8 +17,10 @@ package kafka.message +import java.io.RandomAccessFile import junit.framework.Assert._ import kafka.utils.TestUtils._ +import kafka.log.FileMessageSet import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -61,10 +63,11 @@ trait BaseMessageSetTestCases extends JUnitSuite { def testWriteToWithMessageSet(set: MessageSet) { // do the write twice to ensure the message set is restored to its orginal state for(i <- List(0,1)) { - val channel = tempChannel() + val file = tempFile() + val channel = new RandomAccessFile(file, "rw").getChannel() val written = set.writeTo(channel, 0, 1024) assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) - val newSet = new FileMessageSet(channel, false) + val newSet = new FileMessageSet(file, channel, false) checkEquals(set.iterator, newSet.iterator) } } diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 75390f4..8ad1944 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -18,6 +18,7 @@ package kafka.message import java.nio._ +import java.util.concurrent.atomic.AtomicLong import junit.framework.Assert._ import org.junit.Test import kafka.utils.TestUtils @@ -147,13 +148,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { // check uncompressed offsets checkOffsets(messages, 0) var offset = 1234567 - checkOffsets(messages.assignOffsets(offset, NoCompressionCodec), offset) + checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec), offset) // check compressed messages - assertTrue("All compressed messages should start with offset = 0", - compressedMessages.map(_.offset).forall(_ == 0)) - assertTrue("After assignment, all compressed messages should have the base offset", - messages.assignOffsets(offset, DefaultCompressionCodec).map(_.offset).forall(_ == offset)) + checkOffsets(compressedMessages, 0) + checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec), offset) } /* check that offsets are assigned based on byte offset from the given base offset */ @@ -161,7 +160,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { var offset = baseOffset for(entry <- messages) { assertEquals("Unexpected offset in message set iterator", offset, entry.offset) - offset += MessageSet.entrySize(entry.message) + offset += 1 } } diff --git a/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala b/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala deleted file mode 100644 index fcbdd33..0000000 --- a/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.message - -import java.io.ByteArrayOutputStream -import kafka.utils.TestUtils -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import junit.framework.Assert._ - -class CompressionUtilTest extends JUnitSuite { - - @Test - def testSimpleCompressDecompress() { - val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - val message = CompressionUtils.compress(0, messages) - val decompressedMessages = CompressionUtils.decompress(message) - TestUtils.checkLength(decompressedMessages.iterator,3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) - } - - @Test - def testComplexCompressDecompress() { - val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - val message = CompressionUtils.compress(0, messages.slice(0, 2)) - val complexMessages = List[Message](message):::messages.slice(2,3) - val complexMessage = CompressionUtils.compress(0, complexMessages) - val decompressedMessages = CompressionUtils.decompress(complexMessage) - TestUtils.checkLength(TestUtils.getMessageIterator(decompressedMessages.iterator),3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) - } - - @Test - def testSnappyCompressDecompressExplicit() { - if(!isSnappyAvailable()) - return - val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - val message = CompressionUtils.compress(0L, messages, SnappyCompressionCodec) - assertEquals(message.compressionCodec,SnappyCompressionCodec) - val decompressedMessages = CompressionUtils.decompress(message) - TestUtils.checkLength(decompressedMessages.iterator,3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) - } - - def isSnappyAvailable(): Boolean = { - try { - val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } -} diff --git a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala deleted file mode 100644 index f9a277a..0000000 --- a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.message - -import java.nio._ -import junit.framework.Assert._ -import kafka.utils.TestUtils._ -import org.junit.Test - -class FileMessageSetTest extends BaseMessageSetTestCases { - - val messageSet = createMessageSet(messages) - - def createMessageSet(messages: Seq[Message]): FileMessageSet = { - val set = new FileMessageSet(tempFile(), true) - set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) - set.flush() - set - } - - @Test - def testFileSize() { - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - messageSet.append(singleMessageSet("abcd".getBytes())) - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - } - - @Test - def testIterationOverPartialAndTruncation() { - testPartialWrite(0, messageSet) - testPartialWrite(2, messageSet) - testPartialWrite(4, messageSet) - testPartialWrite(5, messageSet) - testPartialWrite(6, messageSet) - } - - def testPartialWrite(size: Int, messageSet: FileMessageSet) { - val buffer = ByteBuffer.allocate(size) - val originalPosition = messageSet.channel.position - for(i <- 0 until size) - buffer.put(0.asInstanceOf[Byte]) - buffer.rewind() - messageSet.channel.write(buffer) - // appending those bytes should not change the contents - checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) - assertEquals("Unexpected number of bytes truncated", size.longValue, messageSet.recover()) - assertEquals("File pointer should now be at the end of the file.", originalPosition, messageSet.channel.position) - // nor should recovery change the contents - checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) - } - - @Test - def testIterationDoesntChangePosition() { - val position = messageSet.channel.position - checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) - assertEquals(position, messageSet.channel.position) - } - - @Test - def testRead() { - val read = messageSet.read(0, messageSet.sizeInBytes) - checkEquals(messageSet.iterator, read.iterator) - val items = read.iterator.toList - val sec = items.tail.head - val read2 = messageSet.read(sec.offset, messageSet.sizeInBytes) - checkEquals(items.tail.iterator, read2.iterator) - } - -} diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala new file mode 100644 index 0000000..8b5f38a --- /dev/null +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.message + +import java.io.ByteArrayOutputStream +import java.util.concurrent.atomic.AtomicLong +import scala.collection._ +import kafka.utils.TestUtils +import org.scalatest.junit.JUnitSuite +import org.junit._ +import junit.framework.Assert._ + +class MessageCompressionTest extends JUnitSuite { + + @Test + def testSimpleCompressDecompress() { + val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec) + if(isSnappyAvailable) + codecs += SnappyCompressionCodec + for(codec <- codecs) + testSimpleCompressDecompress(codec) + } + + def testSimpleCompressDecompress(compressionCodec: CompressionCodec) { + val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) + val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages:_*) + assertEquals(compressionCodec, messageSet.shallowIterator.next.message.compressionCodec) + val decompressed = messageSet.iterator.map(_.message).toList + assertEquals(messages, decompressed) + } + + @Test + def testComplexCompressDecompress() { + val messages = List(new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) + val message = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages.slice(0, 2):_*) + val complexMessages = List(message.shallowIterator.next.message):::messages.slice(2,3) + val complexMessage = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = complexMessages:_*) + val decompressedMessages = complexMessage.iterator.map(_.message).toList + assertEquals(messages, decompressedMessages) + } + + def isSnappyAvailable(): Boolean = { + try { + val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream()) + true + } catch { + case e: UnsatisfiedLinkError => false + } + } +} diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1d434dd..3ef049a 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -431,16 +431,10 @@ class AsyncProducerTest extends JUnit3Suite { encoder = new StringEncoder, producerPool = producerPool, topicPartitionInfos) - try { - val data = List( - new ProducerData[Int,String](topic1, 0, msgs), - new ProducerData[Int,String](topic1, 1, msgs) - ) - handler.handle(data) - handler.close() - } catch { - case e: Exception => fail("Not expected", e) - } + val data = List(new ProducerData[Int,String](topic1, 0, msgs), + new ProducerData[Int,String](topic1, 1, msgs)) + handler.handle(data) + handler.close() EasyMock.verify(mockSyncProducer) EasyMock.verify(producerPool) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 8486a8b..2077ec1 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -170,19 +170,14 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val messageSet = if(leader == server1.config.brokerId) { val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) - response1.messageSet("new-topic", 0).iterator + response1.messageSet("new-topic", 0).iterator.toBuffer }else { val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) - response2.messageSet("new-topic", 0).iterator + response2.messageSet("new-topic", 0).iterator.toBuffer } - assertTrue("Message set should have 1 message", messageSet.hasNext) - - assertEquals(new Message("test1".getBytes), messageSet.next.message) - assertTrue("Message set should have 1 message", messageSet.hasNext) - assertEquals(new Message("test2".getBytes), messageSet.next.message) - if (messageSet.hasNext) - fail("Message set should not have any more messages, but received a message of %s" - .format(Utils.toString(messageSet.next.message.payload, "UTF-8"))) + assertEquals("Should have fetched 2 messages", 2, messageSet.size) + assertEquals(new Message("test1".getBytes), messageSet(0).message) + assertEquals(new Message("test2".getBytes), messageSet(1).message) producer1.close() try { diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 0500b0f..17015f9 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -21,11 +21,11 @@ import java.net.SocketTimeoutException import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand -import kafka.common.{ErrorMapping, MessageSizeTooLargeException} +import kafka.common.{ErrorMapping} import kafka.integration.KafkaServerTestHarness -import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet, MessageSet} +import kafka.message._ import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, SystemTime, TestUtils} +import kafka.utils._ import org.junit.Test import org.scalatest.junit.JUnit3Suite import kafka.api.TopicData @@ -120,7 +120,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(1, response2.errors.length) Assert.assertEquals(ErrorMapping.NoError, response2.errors(0)) - Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0)) + Assert.assertEquals(0, response2.offsets(0)) } @Test @@ -163,8 +163,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { // the first and last message should have been accepted by broker Assert.assertEquals(0, response2.errors(0)) Assert.assertEquals(0, response2.errors(2)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) + Assert.assertEquals(0, response2.offsets(0)) + Assert.assertEquals(0, response2.offsets(2)) // the middle message should have been rejected because broker doesn't lead partition Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 5a83a8e..4aa7ed1 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -53,19 +53,20 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - var expectedHw = sendMessages(2) + val numMessages = 2L + sendMessages(numMessages.toInt) // give some time for the follower 1 to record leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == expectedHw, 10000)) + server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000)) servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) producer.close() val leaderHW = hwFile1.read(topic, 0) - assertEquals(expectedHw, leaderHW) + assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read(topic, 0) - assertEquals(expectedHw, followerHW) + assertEquals(numMessages, followerHW) servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)}) } @@ -91,7 +92,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(0L, hwFile1.read(topic, 0)) - var hw = sendMessages() + sendMessages(1) + var hw = 1L // kill the server hosting the preferred replica server1.shutdown() @@ -116,7 +118,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader) assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1)) - hw += sendMessages() + sendMessages(1) + hw += 1 + // give some time for follower 1 to record leader HW of 60 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) @@ -158,7 +162,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Leader should get elected", leader.isDefined) // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - var hw = sendMessages(20) + sendMessages(20) + var hw = 20L // give some time for follower 1 to record leader HW of 600 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) @@ -203,7 +208,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - var hw = sendMessages(2) + sendMessages(2) + var hw = 2L + // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) @@ -226,7 +233,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - hw += sendMessages(2) + sendMessages(2) + hw += 2 + // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) @@ -238,10 +247,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { servers.foreach(server => Utils.rm(server.config.logDir)) } - /* send n messages and return the number of bytes sent */ - private def sendMessages(n: Int = 1): Long = { + private def sendMessages(n: Int = 1) { for(i <- 0 until n) producer.send(new ProducerData[Int, Message](topic, 0, message)) - MessageSet.entrySize(message) * n } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 6a12a95..749c0ae 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -60,7 +60,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { new ProducerData[String, String](topic2, testMessageList2)) producer.close() - def condition(): Boolean = { + def logsMatch(): Boolean = { var result = true for (topic <- List(topic1, topic2)) { val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset @@ -69,6 +69,6 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } result } - assertTrue("broker logs should be identical", waitUntilTrue(condition, 6000)) + assertTrue("Broker logs should be identical", waitUntilTrue(logsMatch, 6000)) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 79fa5a3..d738010 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -80,7 +80,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { fetchedMessage = fetched.messageSet(topic, 0) } TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator) - val newOffset = fetchedMessage.validBytes + val newOffset = fetchedMessage.last.nextOffset // send some more messages producer.send(new ProducerData[Int, Message](topic, 0, sent2)) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index a2789c0..5cd9223 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -50,6 +50,7 @@ class SimpleFetchTest extends JUnit3Suite { val time = new MockTime val leo = 20 val hw = 5 + val fetchSize = 100 val messages = new Message("test-message".getBytes()) val zkClient = EasyMock.createMock(classOf[ZkClient]) @@ -57,7 +58,7 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -89,7 +90,7 @@ class SimpleFetchTest extends JUnit3Suite { // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() .replicaId(FetchRequest.NonFollowerId) - .addFetch(topic, partitionId, 0, hw*2) + .addFetch(topic, partitionId, 0, fetchSize) .build() val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) @@ -125,7 +126,7 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages)) EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])