diff --cc config/log4j.properties index 1891f38,b76bc94..0000000 --- a/config/log4j.properties +++ b/config/log4j.properties @@@ -36,12 -36,12 +36,18 @@@ log4j.appender.requestAppender.File=kaf log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.controllerAppender.File=controller.log + log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout + log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender @@@ -56,13 -56,13 +62,16 @@@ log4j.additivity.kafka.network.RequestC #log4j.logger.kafka.network.Processor=TRACE, requestAppender #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender #log4j.additivity.kafka.server.KafkaApis=false -log4j.logger.kafka.request.logger=TRACE, requestAppender +log4j.logger.kafka.request.logger=WARN, requestAppender log4j.additivity.kafka.request.logger=false - log4j.logger.kafka.controller=TRACE, stateChangeAppender + log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=false +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false +log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.Cleaner=false ++ + log4j.logger.state.change.logger=TRACE, stateChangeAppender + log4j.additivity.state.change.logger=false - - diff --cc core/src/main/scala/kafka/log/Log.scala index 631953f,7d71451..0000000 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@@ -206,67 -255,79 +206,73 @@@ class Log(val dir: File * This method will generally be responsible for assigning offsets to the messages, * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. * - * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, - * or (-1,-1) if the message set is empty + * @param messages The message set to append + * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given + * + * @throws KafkaStorageException If the append fails due to an I/O error. + * - * @return Information about the appended messages including the first and last offset ++ * @return the first and the last offset in the appended messages */ - def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { + def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = { - val messageSetInfo = analyzeAndValidateMessageSet(messages) - + val appendInfo = analyzeAndValidateMessageSet(messages) + // if we have any valid messages, append them to the log - if(messageSetInfo.count == 0) { - (-1L, -1L) - } else { - // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) - - try { - // they are valid, insert them in the log - val offsets = lock synchronized { - val firstOffset = nextOffset.get - - // maybe roll the log if this segment is full - val segment = maybeRoll(segments.view.last) - - // assign offsets to the messageset - val lastOffset = - if(assignOffsets) { - val offsetCounter = new AtomicLong(nextOffset.get) - try { - validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec) - } catch { - case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) - } - val assignedLastOffset = offsetCounter.get - 1 - val numMessages = assignedLastOffset - firstOffset + 1 - BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages) - assignedLastOffset - } else { - require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages) - require(messageSetInfo.firstOffset >= nextOffset.get, - "Attempt to append a message set beginning with offset %d to a log with log end offset %d." - .format(messageSetInfo.firstOffset, nextOffset.get)) - messageSetInfo.lastOffset - } + if(appendInfo.count == 0) - return appendInfo ++ return (-1L, -1L) + + // trim any invalid bytes or partial messages before appending it to the on-disk log + var validMessages = trimInvalidBytes(messages) - // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison - // happens with the new message size (after re-compression, if any) - for(messageAndOffset <- validMessages.shallowIterator) { - if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize) - throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." - .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize)) + try { + // they are valid, insert them in the log + lock synchronized { ++ val firstOffset = nextOffset.get ++ + // maybe roll the log if this segment is full + val segment = maybeRoll() - - if(assignOffsets) { - // assign offsets to the messageset - appendInfo.firstOffset = nextOffset.get - val offset = new AtomicLong(nextOffset.get) - try { - validMessages = validMessages.assignOffsets(offset, appendInfo.codec) - } catch { - case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) ++ ++ val lastOffset = ++ if(assignOffsets) { ++ // assign offsets to the messageset ++ val offset = new AtomicLong(nextOffset.get) ++ try { ++ validMessages = validMessages.assignOffsets(offset, appendInfo.codec) ++ } catch { ++ case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) ++ } ++ offset.get - 1 ++ } else { ++ // we are taking the offsets we are given ++ if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get) ++ throw new IllegalArgumentException("Out of order offsets found in " + messages) ++ appendInfo.lastOffset } - appendInfo.lastOffset = offset.get - 1 - } else { - // we are taking the offsets we are given - if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get) - throw new IllegalArgumentException("Out of order offsets found in " + messages) - } - // now append to the log - segment.append(firstOffset, validMessages) - - // advance the log end offset - nextOffset.set(lastOffset + 1) + // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison + // happens with the new message size (after re-compression, if any) + for(messageAndOffset <- validMessages.shallowIterator) { + if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) + throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." + .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + } - trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" - .format(this.name, firstOffset, nextOffset.get(), validMessages)) + // now append to the log - trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset)) - segment.append(appendInfo.firstOffset, validMessages) - ++ segment.append(firstOffset, validMessages) + - // return the offset at which the messages were appended - (firstOffset, lastOffset) - } - - // maybe flush the log and index - val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt + // increment the log end offset - nextOffset.set(appendInfo.lastOffset + 1) - - // maybe flush the log and index - maybeFlush(appendInfo.count) - - appendInfo ++ nextOffset.set(lastOffset + 1) ++ ++ trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" ++ .format(this.name, firstOffset, nextOffset.get(), validMessages)) ++ ++ val numAppendedMessages = (lastOffset - firstOffset + 1).toInt + maybeFlush(numAppendedMessages) - - // return the first and last offset - offsets - } catch { - case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) ++ ++ // return the offset of the first and the last message in the messageset appended ++ (firstOffset, lastOffset) } + } catch { + case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } diff --cc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 29d0af7,d4f15c1..0000000 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@@ -63,21 -63,38 +63,38 @@@ class ReplicaFetcherThread(name:String } } - // handle a partition whose offset is out of range and return a new fetch offset + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - // This means the local replica is out of date. Truncate the log and catch up from beginning. - val request = OffsetRequest( - replicaId = brokerConfig.brokerId, - requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)) - ) - val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) - val offset = partitionErrorAndOffset.error match { - case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head - case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error) - } val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get - replica.log.get.truncateFullyAndStartAt(offset) - offset + val log = replica.log.get + + /** + * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up + * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly + * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower + * and it may discover that the current leader's end offset is behind its own end offset. + * + * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching. + * + * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. + */ + val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) + if (leaderEndOffset < log.logEndOffset) { + log.truncateTo(leaderEndOffset) + leaderEndOffset + } else { + /** + * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's + * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset). + * + * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. + */ + val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) - log.truncateAndStartWithNewOffset(leaderStartOffset) ++ log.truncateFullyAndStartAt(leaderStartOffset) + leaderStartOffset + } } // any logic for partitions whose leader has changed diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala index 765d3cb,68e712c..0000000 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@@ -48,10 -48,11 +50,11 @@@ class ReplicaManager(val config: KafkaC private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) - this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap + val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false + this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) newGauge( "LeaderCount", @@@ -174,8 -180,8 +182,8 @@@ partition.leaderReplicaIfLocal match { case Some(leaderReplica) => leaderReplica case None => - throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d" - .format(topic, partitionId, config.brokerId)) + throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d" - .format(topic, partitionId, config.brokerId)) ++ .format(topic, partitionId, config.brokerId)) } } } diff --cc core/src/main/scala/kafka/tools/DumpLogSegments.scala index d9546ca,06e6437..0000000 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@@ -119,32 -123,34 +123,35 @@@ object DumpLogSegments val messageSet = new FileMessageSet(file) var validBytes = 0L var lastOffset = -1l - for(messageAndOffset <- messageSet) { - val msg = messageAndOffset.message + for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration + val itr = getIterator(shallowMessageAndOffset, isDeepIteration) + for (messageAndOffset <- itr) { + val msg = messageAndOffset.message - if(lastOffset == -1) + if(lastOffset == -1) + lastOffset = messageAndOffset.offset + // If we are iterating uncompressed messages, offsets must be consecutive + else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) { + var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]()) + nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset) + nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) + } lastOffset = messageAndOffset.offset - // If it's uncompressed message, its offset must be lastOffset + 1 no matter last message is compressed or uncompressed - else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) { - var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]()) - nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset) - nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq) - } - lastOffset = messageAndOffset.offset - print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + - " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) - validBytes += MessageSet.entrySize(msg) - if(msg.hasKey) - print(" keysize: " + msg.keySize) - if(printContents) { + print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) if(msg.hasKey) - print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8")) - val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8") - print(" payload: " + payload) + print(" keysize: " + msg.keySize) + if(printContents) { + if(msg.hasKey) - print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) - print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) ++ print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8")) ++ val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8") ++ print(" payload: " + payload) + } + println() } - println() + validBytes += MessageSet.entrySize(shallowMessageAndOffset.message) } val trailingBytes = messageSet.sizeInBytes - validBytes if(trailingBytes > 0) diff --cc core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index aa5e661,3cfa384..0000000 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@@ -196,7 -214,8 +214,8 @@@ object SimpleConsumerShell extends Logg System.out.println("next offset = " + offset) val message = messageAndOffset.message val key = if(message.hasKey) Utils.readBytes(message.key) else null - formatter.writeTo(key, Utils.readBytes(message.payload), System.out) + formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) + numMessagesConsumed += 1 } catch { case e => if (skipMessageOnError) diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala index 5658ed4,4ed88e8..0000000 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@@ -212,10 -151,10 +212,10 @@@ class LogTest extends JUnitSuite @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) - val offsets = messageSets.map(log.append(_).firstOffset) - val offsets = messageSets.map(log.append(_)._1) ++ messageSets.foreach(log.append(_)) log.flush /* do successive reads to ensure all our messages are there */ @@@ -250,46 -187,66 +250,46 @@@ assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - - @Test - def testFindSegment() { - assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45)) - assertEquals("Search in segment list just outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - None, Log.findRange(makeRanges(5, 9, 12), -1)) - assertContains(makeRanges(5, 9, 12), 11) - assertContains(makeRanges(5), 4) - assertContains(makeRanges(5,8), 5) - assertContains(makeRanges(5,8), 6) - } + /** + * Test garbage collecting old segments + */ @Test - def testEdgeLogRollsStartingAtZero() { - // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val curOffset = log.logEndOffset - assertEquals(curOffset, 0) - - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) - - // we now have a new log; the starting offset of the new log should remain 0 - assertEquals(curOffset, log.logEndOffset) - log.delete() - } - - @Test - def testEdgeLogRollsStartingAtNonZero() { - // second test an empty log segment starting at non-zero - val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val numMessages = 1 - for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - val curOffset = log.logEndOffset - - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) - - // time goes by; the log file (which is empty) is deleted again - val deletedSegments = log.markDeletedWhile(_ => true) - - // we shouldn't delete the last empty log segment. - assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) + def testThatGarbageCollectingSegmentsDoesntChangeOffset() { + for(messagesToAppend <- List(0, 1, 25)) { + logDir.mkdirs() + // first test a log segment starting at 0 + val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) + for(i <- 0 until messagesToAppend) + log.append(TestUtils.singleMessageSet(i.toString.getBytes)) + + var currOffset = log.logEndOffset + assertEquals(currOffset, messagesToAppend) + + // time goes by; the log file is deleted + log.deleteOldSegments(_ => true) + + assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset) + assertEquals("We should still have one segment left", 1, log.numberOfSegments) + assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) + assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + currOffset, - log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) ++ log.append(TestUtils.singleMessageSet("hello".toString.getBytes))._1) + + // cleanup the log + log.delete() + } } + /** + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * setting and checking that an exception is thrown. + */ @Test def testMessageSizeCheck() { - val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes())) - val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes())) + val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes)) // append messages to log val maxMessageSize = second.sizeInBytes - 1 diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 55cb0cd..a2afd16 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -80,7 +80,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) }.toSet - case None => throw new AdministrationException("Preferred replica election data is empty") + case None => throw new AdminOperationException("Preferred replica election data is empty") } case None => throw new AdminOperationException("Preferred replica election data is empty") } diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 1ca37e2..1cbe6e8 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -54,9 +54,9 @@ object OffsetCommitRequest extends Logging { case class OffsetCommitRequest(groupId: String, requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], versionId: Short = OffsetCommitRequest.CurrentVersion, - correlationId: Int = 0, + override val correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 3b0d861..cbb5fa1 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -48,9 +48,9 @@ object OffsetCommitResponse extends Logging { } case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], - correlationId: Int = 0, + override val correlationId: Int = 0, clientId: String = OffsetCommitResponse.DefaultClientId) - extends RequestOrResponse { + extends RequestOrResponse(correlationId = correlationId) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index fe94f17..a4c5623 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -52,9 +52,9 @@ object OffsetFetchRequest extends Logging { case class OffsetFetchRequest(groupId: String, requestInfo: Seq[TopicAndPartition], versionId: Short = OffsetFetchRequest.CurrentVersion, - correlationId: Int = 0, + override val correlationId: Int = 0, clientId: String = OffsetFetchRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) { + extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index 3d4ce2a..71c2efb 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -50,9 +50,9 @@ object OffsetFetchResponse extends Logging { } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - correlationId: Int = 0, + override val correlationId: Int = 0, clientId: String = OffsetFetchResponse.DefaultClientId) - extends RequestOrResponse { + extends RequestOrResponse(correlationId = correlationId) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f27e198..8d48022 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -196,17 +196,18 @@ class KafkaApis(val requestChannel: RequestChannel, try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val log = localReplica.log.get - val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) - + val (firstOffset, lastOffset) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) + val numAppendedMessages = lastOffset - firstOffset + 1 + // update stats - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(info.count) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, firstOffset, lastOffset)) + ProduceResult(topicAndPartition, firstOffset, lastOffset) } catch { // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 081f188..8692abc 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -24,6 +24,7 @@ import java.nio.channels._ import java.lang.management._ import javax.management._ import scala.collection._ +import mutable.ListBuffer import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index c6e7a57..2ffcd65 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -79,8 +79,8 @@ object StressTestLog { class WriterThread(val log: Log) extends WorkerThread { @volatile var offset = 0 override def work() { - val logAppendInfo = log.append(TestUtils.singleMessageSet(offset.toString.getBytes)) - require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset) + val (firstOffset, lastOffset) = log.append(TestUtils.singleMessageSet(offset.toString.getBytes)) + require(firstOffset == offset && lastOffset == offset) offset += 1 if(offset % 1000 == 0) Thread.sleep(500) diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 4619d86..d861388 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -208,7 +208,7 @@ class CleanerTest extends JUnitSuite { def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq) - yield log.append(message(key, value)).firstOffset + yield log.append(message(key, value))._1 } def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5a489f9..15e9b60 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -77,7 +77,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { for(dup <- 0 until numDups; key <- 0 until numKeys) yield { val count = counter - val appendInfo = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) counter += 1 (key, count) } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6916df4..6e9c1ec 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -84,8 +84,8 @@ class LogManagerTest extends JUnit3Suite { var offset = 0L for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) - val info = log.append(set) - offset = info.lastOffset + val (firstOffset, lastOffset) = log.append(set) + offset = lastOffset } assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) @@ -127,8 +127,8 @@ class LogManagerTest extends JUnit3Suite { val numMessages = 200 for(i <- 0 until numMessages) { val set = TestUtils.singleMessageSet("test".getBytes()) - val info = log.append(set) - offset = info.firstOffset + val (firstOffset, lastOffset) = log.append(set) + offset = lastOffset } assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.segmentSize, log.numberOfSegments)