Index: core/src/test/scala/unit/kafka/log/LogTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogTest.scala (revision 1351022) +++ core/src/test/scala/unit/kafka/log/LogTest.scala (working copy) @@ -185,7 +185,7 @@ val deletedSegments = log.markDeletedWhile(_ => true) // we shouldn't delete the last empty log segment. - assertTrue("We shouldn't delete the last empty log segment", log.segments.view.size == 1) + assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0) // we now have a new log assertEquals(curOffset, log.nextAppendOffset) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1351022) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -21,10 +21,11 @@ import java.io.{IOException, RandomAccessFile, File} import java.util.{Comparator, Collections, ArrayList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger} -import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet} import kafka.utils._ import java.text.NumberFormat -import kafka.common.OffsetOutOfRangeException +import kafka.server.BrokerTopicStat +import kafka.common.{InvalidMessageSizeException, OffsetOutOfRangeException} +import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet} object Log { val FileSuffix = ".kafka" @@ -214,7 +215,7 @@ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. * Returns the offset at which the messages are written. */ - def append(messages: MessageSet): Unit = { + def append(messages: ByteBufferMessageSet): Unit = { // validate the messages var numberOfMessages = 0 for(messageAndOffset <- messages) { @@ -223,13 +224,25 @@ numberOfMessages += 1; } + BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) logStats.recordAppendedMessages(numberOfMessages) + // truncate the message set's buffer upto validbytes, before appending it to the on-disk log + val validByteBuffer = messages.getBuffer.duplicate() + val messageSetValidBytes = messages.validBytes + if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0) + throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + + " Message set cannot be appended to log. Possible causes are corrupted produce requests") + + validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int]) + val validMessages = new ByteBufferMessageSet(validByteBuffer) + // they are valid, insert them in the log lock synchronized { try { val segment = segments.view.last - segment.messageSet.append(messages) + segment.messageSet.append(validMessages) maybeFlush(numberOfMessages) maybeRoll(segment) } @@ -262,10 +275,17 @@ val deletable = view.takeWhile(predicate) for(seg <- deletable) seg.deleted = true - val numToDelete = deletable.size + var numToDelete = deletable.size // if we are deleting everything, create a new empty segment if(numToDelete == view.size) { - roll() + if (view(numToDelete - 1).size > 0) + roll() + else { + // If the last segment to be deleted is empty and we roll the log, the new segment will have the same + // file name. So simply reuse the last segment and reset the modified time. + view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds) + numToDelete -=1 + } } segments.trunc(numToDelete) } @@ -309,9 +329,12 @@ */ def roll() { lock synchronized { - val last = segments.view.last val newOffset = nextAppendOffset val newFile = new File(dir, nameFromOffset(newOffset)) + if (newFile.exists) { + warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first") + newFile.delete() + } debug("Rolling log '" + name + "' to " + newFile.getName()) segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) } Index: core/src/main/scala/kafka/network/BlockingChannel.scala =================================================================== --- core/src/main/scala/kafka/network/BlockingChannel.scala (revision 1351022) +++ core/src/main/scala/kafka/network/BlockingChannel.scala (working copy) @@ -53,6 +53,7 @@ channel.configureBlocking(true) channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) + channel.socket.setTcpNoDelay(true) channel.connect(new InetSocketAddress(host, port)) writeChannel = channel Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1351022) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -126,6 +126,8 @@ for(topicData <- request.data) { for(partitionData <- topicData.partitionData) { msgIndex += 1 + BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) try { kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) @@ -136,6 +138,8 @@ trace(partitionData.messages.sizeInBytes + " bytes written to logs.") } catch { case e => + BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e) e match { case _: IOException => @@ -239,12 +243,16 @@ for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { case Left(err) => + BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest fetchRequest.replicaId match { case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) case _ => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) } case Right(messages) => + BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId) assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))