diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index be60c24..d18990e 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -68,9 +68,9 @@ class FileMessageSet private[kafka](val file: File, */ private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { var position = startingPosition - val buffer = ByteBuffer.allocate(12) + val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) val size = _size.get() - while(position + 12 < size) { + while(position + MessageSet.LogOverhead < size) { buffer.rewind() channel.read(buffer, position) if(buffer.hasRemaining) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2309333..a465c96 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -483,8 +483,10 @@ private[kafka] class Log(val dir: File, var total = 0 for(segment <- segments) { info("Deleting log segment " + segment.start + " from " + name) - if(!segment.messageSet.delete() || !segment.index.delete()) { - warn("Delete of log segment " + segment.start + " failed.") + val deletedLog = segment.messageSet.delete() + val deletedIndex = segment.index.delete() + if(!deletedIndex || !deletedLog) { + throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.") } else { total += 1 } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 78ce9b6..23f1c2d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -38,7 +38,6 @@ private[kafka] class LogManager(val config: KafkaConfig, needRecovery: Boolean) extends Logging { val logDir: File = new File(config.logDir) - private val numPartitions = config.numPartitions private val logFileSizeMap = config.logFileSizeMap private val flushInterval = config.flushInterval private val logCreationLock = new Object diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4bf3939..577d510 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -67,7 +67,7 @@ class LogSegment(val messageSet: FileMessageSet, * Find the physical file position for the least offset >= the given offset. If no offset is found * that meets this criteria before the end of the log, return null. */ - def translateOffset(offset: Long): OffsetPosition = { + private def translateOffset(offset: Long): OffsetPosition = { val mapping = index.lookup(offset) messageSet.searchFor(offset, mapping.position) } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index e8eb554..f6db0fb 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -141,7 +141,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) var lo = 0 var hi = entries-1 while(lo < hi) { - val mid = ceil((hi + lo) / 2.0).toInt + val mid = ceil(hi/2.0 + lo/2.0).toInt val found = logical(idx, mid) if(found == relativeOffset) return mid @@ -150,7 +150,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) else hi = mid - 1 } - return lo + lo } /* return the nth logical offset relative to the base offset */ diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 34fa07a..a06f37a 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -31,7 +31,7 @@ object ByteBufferMessageSet { private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { if(messages.size == 0) { - return MessageSet.Empty.buffer + MessageSet.Empty.buffer } else if(compressionCodec == NoCompressionCodec) { val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) for(message <- messages) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 911d2f8..83a24ec 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,10 +70,15 @@ class KafkaApis(val requestChannel: RequestChannel, if(requestLogger.isTraceEnabled) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) - - val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) - val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) + try { + val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) + val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) + } catch { + case e: KafkaStorageException => + fatal("Disk error while becoming leader.", e) + Runtime.getRuntime.halt(1) + } }