diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 355948e..0fe6471 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -24,7 +24,6 @@ import utils.{Utils, Logging} object Kafka extends Logging { def main(args: Array[String]): Unit = { - if (args.length != 1) { println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) System.exit(1) diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 3cf57b8..531fcbe 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -17,6 +17,7 @@ package kafka.api +import java.util.Arrays import java.nio.ByteBuffer import java.nio.channels.GatheringByteChannel import kafka.common.ErrorMapping @@ -34,7 +35,7 @@ object PartitionData { val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) buffer.position(buffer.position + messageSetSize) - new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset)) + new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer)) } } @@ -42,6 +43,33 @@ case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, in val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8 def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) + + override def equals(other: Any): Boolean = { + other match { + case that: PartitionData => + partition == that.partition && + error == that.error && + initialOffset == that.initialOffset && + hw == that.hw && + messages == that.messages + case _ => false + } + } + + override def hashCode: Int = + Arrays.hashCode(Array(partition: java.lang.Integer, + error: java.lang.Short, + initialOffset: java.lang.Long, + hw: java.lang.Long, + messages)) + + override def toString(): String = + "PartitionData(partition = " + partition + + ", error = " + error + + ", initialOffset = " + initialOffset + + ", hw = " + hw + + ", messages = " + messages +")" + } // SENDS @@ -105,15 +133,18 @@ object TopicData { case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) { val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes) - // need to override equals due to brokern java-arrays equals functionality + // need to override equals due to broken java-arrays equals functionality override def equals(other: Any): Boolean = { other match { case that: TopicData => - ( topic == that.topic && - partitionDataArray.toSeq == that.partitionDataArray.toSeq ) + (topic == that.topic && + partitionDataArray.toSeq == that.partitionDataArray.toSeq) case _ => false } } + + override def toString(): String = + "TopicData(topic = " + topic + ", data = " + partitionDataArray.mkString(", ") + ")" } class TopicDataSend(val topicData: TopicData) extends Send { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index efbaf8b..c47e2af 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -18,6 +18,7 @@ package kafka.api import java.nio._ +import java.util.Arrays import kafka.message._ import kafka.utils._ @@ -53,12 +54,13 @@ object ProducerRequest { } } -case class ProducerRequest( versionId: Short, - correlationId: Int, - clientId: String, - requiredAcks: Short, - ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { + +case class ProducerRequest(versionId: Short, + correlationId: Int, + clientId: String, + requiredAcks: Short, + ackTimeoutMs: Int, + data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) @@ -75,10 +77,11 @@ case class ProducerRequest( versionId: Short, Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic buffer.putInt(topicData.partitionDataArray.size) //the number of partitions for(partitionData <- topicData.partitionDataArray) { + val bytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer buffer.putInt(partitionData.partition) - buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit) - buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer) - partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind + buffer.putInt(bytes.limit) + buffer.put(bytes) + bytes.rewind } } } @@ -98,16 +101,31 @@ case class ProducerRequest( versionId: Short, // need to override case-class equals due to broken java-array equals() override def equals(other: Any): Boolean = { - other match { + println("this request = " + this + ", that = " + other) + other match { case that: ProducerRequest => - ( correlationId == that.correlationId && - clientId == that.clientId && - requiredAcks == that.requiredAcks && - ackTimeoutMs == that.ackTimeoutMs && - data.toSeq == that.data.toSeq ) + (correlationId == that.correlationId && + clientId == that.clientId && + requiredAcks == that.requiredAcks && + ackTimeoutMs == that.ackTimeoutMs && + data.toSeq == that.data.toSeq ) case _ => false } } + + override def hashCode(): Int = + Arrays.hashCode(Array[Object](correlationId: java.lang.Integer, + clientId, + requiredAcks: java.lang.Short, + ackTimeoutMs: java.lang.Integer, + data.hashCode: java.lang.Integer)) + + override def toString(): String = + "ProducerRequest(correlationId = " + correlationId + + ", clientId = " + clientId + + ", requiredAcks = " + requiredAcks + + ", ackTimeout = " + ackTimeoutMs + + ", data = " + data.mkString(", ") + ")" def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 86ae2aa..37e9d4a 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -43,7 +43,7 @@ class Partition(val topic: String, private val leaderISRUpdateLock = new Object private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 - this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId) + this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -116,7 +116,7 @@ class Partition(val topic: String, def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { leaderISRUpdateLock synchronized { if (leaderEpoch >= leaderAndISR.leaderEpoch){ - info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request" + info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request" .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower")) return false } @@ -183,7 +183,7 @@ class Partition(val topic: String, def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) { leaderISRUpdateLock synchronized { - debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partitionId)) + debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId)) val replica = getOrCreateReplica(replicaId) replica.logEndOffset = offset @@ -195,7 +195,7 @@ class Partition(val topic: String, if (replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica - info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) + info("expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", "))) // update ISR in ZK and cache updateISR(newInSyncReplicas) replicaManager.isrExpandRate.mark() @@ -287,7 +287,7 @@ class Partition(val topic: String, } private def updateISR(newISR: Set[Replica]) { - info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) + info("updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", "))) val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) @@ -296,7 +296,7 @@ class Partition(val topic: String, zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion)) } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + info("cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) } } diff --git a/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala b/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala deleted file mode 100644 index d6ce868..0000000 --- a/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer.storage - -import java.util.concurrent._ -import java.util.concurrent.atomic._ -import java.util.concurrent.locks._ - -class MemoryOffsetStorage extends OffsetStorage { - - val offsetAndLock = new ConcurrentHashMap[(Int, String), (AtomicLong, Lock)] - - def reserve(node: Int, topic: String): Long = { - val key = (node, topic) - if(!offsetAndLock.containsKey(key)) - offsetAndLock.putIfAbsent(key, (new AtomicLong(0), new ReentrantLock)) - val (offset, lock) = offsetAndLock.get(key) - lock.lock - offset.get - } - - def commit(node: Int, topic: String, offset: Long) = { - val (highwater, lock) = offsetAndLock.get((node, topic)) - highwater.set(offset) - lock.unlock - offset - } - -} diff --git a/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala b/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala deleted file mode 100644 index f9c9467..0000000 --- a/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer.storage - - -/** - * A method for storing offsets for the consumer. - * This is used to track the progress of the consumer in the stream. - */ -trait OffsetStorage { - - /** - * Reserve a range of the length given by increment. - * @param increment The size to reserver - * @return The range reserved - */ - def reserve(node: Int, topic: String): Long - - /** - * Update the offset to the new offset - * @param offset The new offset - */ - def commit(node: Int, topic: String, offset: Long) - -} diff --git a/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala b/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala deleted file mode 100644 index a48e7a8..0000000 --- a/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer.storage.sql - -import java.sql._ -import kafka.utils._ -import kafka.consumer.storage.OffsetStorage -import kafka.common.KafkaException - -/** - * An offset storage implementation that uses an oracle database to save offsets - */ -@nonthreadsafe -class OracleOffsetStorage(val connection: Connection) extends OffsetStorage with Logging { - - private val lock = new Object - connection.setAutoCommit(false) - - def reserve(node: Int, topic: String): Long = { - /* try to get and lock the offset, if it isn't there, make it */ - val maybeOffset = selectExistingOffset(connection, node, topic) - val offset = maybeOffset match { - case Some(offset) => offset - case None => { - maybeInsertZeroOffset(connection, node, topic) - selectExistingOffset(connection, node, topic).get - } - } - - debug("Reserved node " + node + " for topic '" + topic + " offset " + offset) - - offset - } - - def commit(node: Int, topic: String, offset: Long) { - var success = false - try { - updateOffset(connection, node, topic, offset) - success = true - } finally { - commitOrRollback(connection, success) - } - debug("Updated node " + node + " for topic '" + topic + "' to " + offset) - } - - def close() { - Utils.swallow(logger.error, connection.close()) - } - - /** - * Attempt to update an existing entry in the table if there isn't already one there - * @return true iff the row didn't already exist - */ - private def maybeInsertZeroOffset(connection: Connection, node: Int, topic: String): Boolean = { - val stmt = connection.prepareStatement( - """insert into kafka_offsets (node, topic, offset) - select ?, ?, 0 from dual where not exists - (select null from kafka_offsets where node = ? and topic = ?)""") - stmt.setInt(1, node) - stmt.setString(2, topic) - stmt.setInt(3, node) - stmt.setString(4, topic) - val updated = stmt.executeUpdate() - if(updated > 1) - throw new KafkaException("More than one key updated by primary key!") - else - updated == 1 - } - - /** - * Attempt to update an existing entry in the table - * @return true iff we updated an entry - */ - private def selectExistingOffset(connection: Connection, node: Int, topic: String): Option[Long] = { - val stmt = connection.prepareStatement( - """select offset from kafka_offsets - where node = ? and topic = ? - for update""") - var results: ResultSet = null - try { - stmt.setInt(1, node) - stmt.setString(2, topic) - results = stmt.executeQuery() - if(!results.next()) { - None - } else { - val offset = results.getLong("offset") - if(results.next()) - throw new KafkaException("More than one entry for primary key!") - Some(offset) - } - } finally { - close(stmt) - close(results) - } - } - - private def updateOffset(connection: Connection, - node: Int, - topic: String, - newOffset: Long): Unit = { - val stmt = connection.prepareStatement("update kafka_offsets set offset = ? where node = ? and topic = ?") - try { - stmt.setLong(1, newOffset) - stmt.setInt(2, node) - stmt.setString(3, topic) - val updated = stmt.executeUpdate() - if(updated != 1) - throw new KafkaException("Unexpected number of keys updated: " + updated) - } finally { - close(stmt) - } - } - - - private def commitOrRollback(connection: Connection, commit: Boolean) { - if(connection != null) { - if(commit) - Utils.swallow(logger.error, connection.commit()) - else - Utils.swallow(logger.error, connection.rollback()) - } - } - - private def close(rs: ResultSet) { - if(rs != null) - Utils.swallow(logger.error, rs.close()) - } - - private def close(stmt: PreparedStatement) { - if(stmt != null) - Utils.swallow(logger.error, stmt.close()) - } - - private def close(connection: Connection) { - if(connection != null) - Utils.swallow(logger.error, connection.close()) - } - -} diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index f1c572f..16c8524 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -24,7 +24,7 @@ private[javaapi] object Implicits extends Logging { implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet): kafka.javaapi.message.ByteBufferMessageSet = { - new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset) + new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer) } implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse = diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0984bd3..27fc2c8 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -16,15 +16,15 @@ */ package kafka.javaapi.message +import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ -class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet { - val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset) - def this(buffer: ByteBuffer) = this(buffer, 0L) - +class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { + val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) + def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, 0L, scala.collection.JavaConversions.asBuffer(messages): _*).buffer) } def this(messages: java.util.List[Message]) { @@ -52,14 +52,10 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Lo override def equals(other: Any): Boolean = { other match { - case that: ByteBufferMessageSet => - (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset + case that: ByteBufferMessageSet => buffer.equals(that.buffer) case _ => false } } - def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet] - - override def hashCode: Int = underlying.hashCode - + override def hashCode: Int = buffer.hashCode } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6b61aa1..2ea5d0b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -269,6 +269,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag // they are valid, insert them in the log lock synchronized { + validMessages.reassignOffsets(logEndOffset) try { var segment = segments.view.last maybeRoll(segment) diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index f9b7b56..2fbae2e 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -17,12 +17,44 @@ package kafka.message +import scala.reflect.BeanProperty import kafka.utils.Logging import java.nio.ByteBuffer import java.nio.channels._ import kafka.utils.IteratorTemplate import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException} +object ByteBufferMessageSet { + + private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { + buffer.putLong(offset) + buffer.putInt(message.size) + buffer.put(message.buffer) + message.buffer.rewind() + } + + private def serialize(baseOffset: Long, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { + if(messages.size == 0) { + return MessageSet.Empty.buffer + } else if(compressionCodec == NoCompressionCodec) { + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + var offset = baseOffset + for(message <- messages) { + writeMessage(buffer, message, offset) + offset += 12 + message.size + } + buffer.rewind() + buffer + } else { + val m = CompressionUtils.compress(baseOffset, messages, compressionCodec) + val buffer = ByteBuffer.allocate(m.size + MessageSet.LogOverhead) + writeMessage(buffer, m, baseOffset) + buffer.rewind() + buffer + } + } +} + /** * A sequence of messages stored in a byte buffer * @@ -33,31 +65,33 @@ import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException} * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method. * */ -class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging { +class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with Logging { private var shallowValidByteCount = -1L if(sizeInBytes > Int.MaxValue) throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue) def this(compressionCodec: CompressionCodec, messages: Message*) { - this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L) + this(ByteBufferMessageSet.serialize(0, compressionCodec, messages:_*)) + } + + def this(compressionCodec: CompressionCodec, initialOffset: Long, messages: Message*) { + this(ByteBufferMessageSet.serialize(initialOffset, compressionCodec, messages:_*)) } def this(messages: Message*) { - this(NoCompressionCodec, messages: _*) + this(NoCompressionCodec, 0, messages: _*) } - def validBytes: Long = shallowValidBytes - private def shallowValidBytes: Long = { if(shallowValidByteCount < 0) { + this.shallowValidByteCount = 0 val iter = this.internalIterator(true) while(iter.hasNext) { val messageAndOffset = iter.next - shallowValidByteCount = messageAndOffset.offset + shallowValidByteCount += 12 + messageAndOffset.message.size } } - if(shallowValidByteCount < initialOffset) 0 - else (shallowValidByteCount - initialOffset) + shallowValidByteCount } /** Write the messages in this set to the given channel */ @@ -74,7 +108,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) /** iterator over compressed messages without decompressing */ def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) - def verifyMessageSize(maxMessageSize: Int){ + def verifyMessageSize(maxMessageSize: Int) { var shallowIter = internalIterator(true) while(shallowIter.hasNext){ var messageAndOffset = shallowIter.next @@ -88,57 +122,42 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var topIter = buffer.slice() - var currValidBytes = initialOffset - var innerIter:Iterator[MessageAndOffset] = null - var lastMessageSize = 0L + var innerIter: Iterator[MessageAndOffset] = null - def innerDone():Boolean = (innerIter==null || !innerIter.hasNext) + def innerDone():Boolean = (innerIter == null || !innerIter.hasNext) def makeNextOuter: MessageAndOffset = { - if (topIter.remaining < 4) { + // if there isn't at least an offset and size, we are done + if (topIter.remaining < 12) return allDone() - } + val offset = topIter.getLong() val size = topIter.getInt() - lastMessageSize = size - - trace("Remaining bytes in iterator = " + topIter.remaining) - trace("size of data = " + size) - - if(size < 0 || topIter.remaining < size) { - if (currValidBytes == initialOffset || size < 0) - throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " + - topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " + - "the fetch size; (2) log corruption )") + if(size < 0) + throw new InvalidMessageException("Message found with corrupt size (" + size + ")") + + // we have an incomplete message + if(topIter.remaining < size) return allDone() - } + + // read the current message and check correctness val message = topIter.slice() message.limit(size) topIter.position(topIter.position + size) val newMessage = new Message(message) if(!newMessage.isValid) - throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec - + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset) + throw new InvalidMessageException("Message is corrupt (stored crc = " + newMessage.checksum + ", computed crc = " + newMessage.computeChecksum() + ")") - if(isShallow){ - currValidBytes += 4 + size - trace("shallow iterator currValidBytes = " + currValidBytes) - new MessageAndOffset(newMessage, currValidBytes) - } - else{ + if(isShallow) { + new MessageAndOffset(newMessage, offset) + } else { newMessage.compressionCodec match { case NoCompressionCodec => - debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) innerIter = null - currValidBytes += 4 + size - trace("currValidBytes = " + currValidBytes) - new MessageAndOffset(newMessage, currValidBytes) + new MessageAndOffset(newMessage, offset) case _ => - debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) innerIter = CompressionUtils.decompress(newMessage).internalIterator() - if (!innerIter.hasNext) { - currValidBytes += 4 + lastMessageSize + if(!innerIter.hasNext) innerIter = null - } makeNext() } } @@ -147,52 +166,49 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) override def makeNext(): MessageAndOffset = { if(isShallow){ makeNextOuter - } - else{ - val isInnerDone = innerDone() - debug("makeNext() in internalIterator: innerDone = " + isInnerDone) - isInnerDone match { - case true => makeNextOuter - case false => { - val messageAndOffset = innerIter.next - if (!innerIter.hasNext) - currValidBytes += 4 + lastMessageSize - new MessageAndOffset(messageAndOffset.message, currValidBytes) - } - } + } else { + if(innerDone()) + makeNextOuter + else + innerIter.next } } + } } - - def sizeInBytes: Long = buffer.limit - override def toString: String = { - val builder = new StringBuilder() - builder.append("ByteBufferMessageSet(") - for(message <- this) { - builder.append(message) - builder.append(", ") + /** + * Update the offsets for this message set + */ + def reassignOffsets(firstOffset: Long) { + var position = 0 + buffer.mark() + while(position < sizeInBytes - MessageSet.LogOverhead) { + buffer.position(position) + buffer.putLong(firstOffset + position) + position += MessageSet.LogOverhead + buffer.getInt() } - builder.append(")") - builder.toString + buffer.reset() } + /** + * The total number of bytes in this message set, including any partial trailing messages + */ + def sizeInBytes: Long = buffer.limit + + /** + * The total number of bytes in this message set not including any partial, trailing messages + */ + def validBytes: Long = shallowValidBytes + override def equals(other: Any): Boolean = { other match { - case that: ByteBufferMessageSet => - (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset + case that: ByteBufferMessageSet => + buffer.equals(that.buffer) case _ => false } } - override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet] - - override def hashCode: Int = { - var hash = 17 - hash = hash * 31 + buffer.hashCode - hash = hash * 31 + initialOffset.hashCode - hash - } + override def hashCode: Int = buffer.hashCode } diff --git a/core/src/main/scala/kafka/message/CompressionUtils.scala b/core/src/main/scala/kafka/message/CompressionUtils.scala index 607ca77..90856d5 100644 --- a/core/src/main/scala/kafka/message/CompressionUtils.scala +++ b/core/src/main/scala/kafka/message/CompressionUtils.scala @@ -96,27 +96,30 @@ object CompressionUtils extends Logging{ //specify the codec which is the default when DefaultCompressionCodec is used private var defaultCodec: CompressionCodec = GZIPCompressionCodec - def compress(messages: Iterable[Message], compressionCodec: CompressionCodec = DefaultCompressionCodec):Message = { + def compress(baseOffset: Long, messages: Iterable[Message], compressionCodec: CompressionCodec = DefaultCompressionCodec): Message = { val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream() debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) var cf: CompressionFacade = null - if (compressionCodec == DefaultCompressionCodec) - cf = CompressionFactory(defaultCodec,outputStream) + cf = CompressionFactory(defaultCodec, outputStream) else - cf = CompressionFactory(compressionCodec,outputStream) + cf = CompressionFactory(compressionCodec, outputStream) val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - messages.foreach(m => m.serializeTo(messageByteBuffer)) + for(message <- messages){ + messageByteBuffer.putLong(baseOffset) + messageByteBuffer.putInt(message.size) + messageByteBuffer.put(message.buffer) + message.buffer.rewind() + } messageByteBuffer.rewind try { cf.write(messageByteBuffer.array) } catch { case e: IOException => error("Error while writing to the GZIP output stream", e) - cf.close() throw e } finally { cf.close() @@ -133,11 +136,10 @@ object CompressionUtils extends Logging{ val intermediateBuffer = new Array[Byte](1024) var cf: CompressionFacade = null - if (message.compressionCodec == DefaultCompressionCodec) - cf = CompressionFactory(defaultCodec,inputStream) + cf = CompressionFactory(defaultCodec, inputStream) else - cf = CompressionFactory(message.compressionCodec,inputStream) + cf = CompressionFactory(message.compressionCodec, inputStream) try { Stream.continually(cf.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => @@ -145,7 +147,6 @@ object CompressionUtils extends Logging{ } }catch { case e: IOException => error("Error while reading from the GZIP input stream", e) - cf.close() throw e } finally { cf.close() diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala index bbe3da6..6eaa306 100644 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ b/core/src/main/scala/kafka/message/FileMessageSet.scala @@ -109,26 +109,27 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, override def makeNext(): MessageAndOffset = { // read the size of the item - val sizeBuffer = ByteBuffer.allocate(4) - channel.read(sizeBuffer, location) - if(sizeBuffer.hasRemaining) + val sizeOffsetBuffer = ByteBuffer.allocate(12) + channel.read(sizeOffsetBuffer, location) + if(sizeOffsetBuffer.hasRemaining) return allDone() - sizeBuffer.rewind() - val size: Int = sizeBuffer.getInt() + sizeOffsetBuffer.rewind() + val offset = sizeOffsetBuffer.getLong() + val size = sizeOffsetBuffer.getInt() if (size < Message.MinHeaderSize) return allDone() // read the item itself val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + 4) + channel.read(buffer, location + 12) if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item - location += size + 4 - new MessageAndOffset(new Message(buffer), location) + location += size + 12 + new MessageAndOffset(new Message(buffer), offset) } } } @@ -179,7 +180,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, def recover(): Long = { checkMutable() val len = channel.size - val buffer = ByteBuffer.allocate(4) + val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) var validUpTo: Long = 0 var next = 0L do { @@ -210,21 +211,22 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, private def validateMessage(channel: FileChannel, start: Long, len: Long, buffer: ByteBuffer): Long = { buffer.rewind() var read = channel.read(buffer, start) - if(read < 4) + if(read < MessageSet.LogOverhead) return -1 // check that we have sufficient bytes left in the file - val size = buffer.getInt(0) - if (size < Message.MinHeaderSize) + val offset = buffer.getLong(0) + val size = buffer.getInt(8) + if(size < Message.MinHeaderSize) return -1 - val next = start + 4 + size + val next = start + MessageSet.LogOverhead + size if(next > len) return -1 // read the message val messageBuffer = ByteBuffer.allocate(size) - var curr = start + 4 + var curr = start + MessageSet.LogOverhead while(messageBuffer.hasRemaining) { read = channel.read(messageBuffer, curr) if(read < 0) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index c7ba4f9..8dd6861 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -18,145 +18,190 @@ package kafka.message import java.nio._ +import scala.math._ import kafka.utils._ import kafka.common.UnknownMagicByteException /** - * Message byte offsets + * Constants related to messages */ object Message { - val MagicVersion: Byte = 1 - val CurrentMagicValue: Byte = 1 - val MagicOffset = 0 - val MagicLength = 1 - val AttributeOffset = MagicOffset + MagicLength - val AttributeLength = 1 + /** - * Specifies the mask for the compression code. 2 bits to hold the compression codec. - * 0 is reserved to indicate no compression + * The current offset and size for all the fixed-length fields */ - val CompressionCodeMask: Int = 0x03 // - - - val NoCompression:Int = 0 - + val CrcOffset = 0 + val CrcLength = 4 + val MagicOffset = CrcOffset + CrcLength + val MagicLength = 1 + val AttributesOffset = MagicOffset + MagicLength + val AttributesLength = 1 + val KeySizeOffset = AttributesOffset + AttributesLength + val KeySizeLength = 4 + val KeyOffset = KeySizeOffset + KeySizeLength + /** - * Computes the CRC value based on the magic byte - * @param magic Specifies the magic byte value. The only value allowed currently is 1. + * The minimum valid size for the message header */ - def crcOffset(magic: Byte): Int = magic match { - case MagicVersion => AttributeOffset + AttributeLength - case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic)) - } + val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength - val CrcLength = 4 - /** - * Computes the offset to the message payload based on the magic byte - * @param magic Specifies the magic byte value. The only value allowed currently is 1. + * The current "magic" value */ - def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength + val CurrentMagicValue: Byte = 2 /** - * Computes the size of the message header based on the magic byte - * @param magic Specifies the magic byte value. The only value allowed currently is 1. + * Specifies the mask for the compression code. 2 bits to hold the compression codec. + * 0 is reserved to indicate no compression */ - def headerSize(magic: Byte): Int = payloadOffset(magic) + val CompressionCodeMask: Int = 0x03 /** - * Size of the header for magic byte 1. This is the minimum size of any message header. + * Compression code for uncompressed messages */ - val MinHeaderSize = headerSize(1); + val NoCompression: Int = 0 + } /** * A message. The format of an N byte message is the following: * - * 1. 1 byte "magic" identifier to allow format changes, whose value is 1 currently - * - * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) - * - * 3. 4 byte CRC32 of the payload - * - * 4. N - 6 byte payload - * + * 1. 4 byte CRC32 of the message + * 2. 1 byte "magic" identifier to allow format changes, value is 2 currently + * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) + * 4. 4 byte key length, containing length K + * 5. K byte key key + * 6. (N - K - 6) byte payload */ class Message(val buffer: ByteBuffer) { import kafka.message.Message._ - - - private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { - this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size)) - buffer.put(CurrentMagicValue) + + def this(bytes: Array[Byte], + compressionCodec: CompressionCodec = NoCompressionCodec, + key: Array[Byte] = null, + offset: Int = 0, + size: Int = -1) = { + this(ByteBuffer.allocate(Message.CrcLength + + Message.MagicLength + + Message.AttributesLength + + Message.KeySizeLength + + (if(key == null) 0 else key.length) + + (if(size >= 0) size else bytes.length - offset))) + // skip crc, we will fill that in at the end + buffer.put(MagicOffset, CurrentMagicValue) var attributes:Byte = 0 - if (compressionCodec.codec > 0) { - attributes = (attributes | (Message.CompressionCodeMask & compressionCodec.codec)).toByte + if (compressionCodec.codec > 0) + attributes = (attributes | (CompressionCodeMask & compressionCodec.codec)).toByte + buffer.put(AttributesOffset, attributes) + if(key == null) { + buffer.putInt(KeySizeOffset, -1) + buffer.position(KeyOffset) + } else { + buffer.putInt(KeySizeOffset, key.length) + buffer.position(KeyOffset) + buffer.put(key, 0, key.length) } - buffer.put(attributes) - Utils.putUnsignedInt(buffer, checksum) - buffer.put(bytes, offset, size) + buffer.put(bytes, offset, if(size >= 0) size else bytes.length - offset) buffer.rewind() + + // now compute the checksum and fill it in + Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum) } - - def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec) - - def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { - //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there - this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec) - } - - def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec) - - def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec) - - def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec) + def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = + this(bytes = bytes, compressionCodec = compressionCodec, key = null) + + def this(bytes: Array[Byte]) = + this(bytes = bytes, compressionCodec = NoCompressionCodec, key = null) + + /** + * Compute the checksum of the message from the message contents + */ + def computeChecksum(): Long = + Utils.crc32(buffer.array, buffer.arrayOffset + CrcOffset + CrcLength, buffer.limit - CrcOffset - CrcLength) + + /** + * Retrieve the previously computed CRC for this message + */ + def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset) + + /** + * The complete serialized size of this message in bytes (including crc, header attributes, etc) + */ def size: Int = buffer.limit - def payloadSize: Int = size - headerSize(magic) + /** + * The length of the key in bytes + */ + def keySize: Int = buffer.getInt(Message.KeySizeOffset) + + /** + * Does the message have a key? + */ + def hasKey: Boolean = buffer.getInt(Message.KeySizeOffset) >= 0 + + /** + * The length of the message value in bytes + */ + def payloadSize: Int = size - KeyOffset - max(0, keySize) + /** + * The magic version of this message + */ def magic: Byte = buffer.get(MagicOffset) - def attributes: Byte = buffer.get(AttributeOffset) + /** + * The attributes stored with this message + */ + def attributes: Byte = buffer.get(AttributesOffset) - def compressionCodec:CompressionCodec = { - magic match { - case 0 => NoCompressionCodec - case 1 => CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask) - case _ => throw new RuntimeException("Invalid magic byte " + magic) - } - - } - - def checksum: Long = Utils.getUnsignedInt(buffer, crcOffset(magic)) + /** + * The compression codec used with this message + */ + def compressionCodec: CompressionCodec = + CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask) + /** + * A ByteBuffer containing the content of the message + */ def payload: ByteBuffer = { var payload = buffer.duplicate - payload.position(headerSize(magic)) + payload.position(KeyOffset + max(keySize, 0)) payload = payload.slice() payload.limit(payloadSize) payload.rewind() payload } - def isValid: Boolean = - checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + payloadOffset(magic), payloadSize) - - def serializedSize: Int = 4 /* int size*/ + buffer.limit - - def serializeTo(serBuffer:ByteBuffer) = { - serBuffer.putInt(buffer.limit) - serBuffer.put(buffer.duplicate) + /** + * A ByteBuffer containing the message key + */ + def key: ByteBuffer = { + val s = keySize + if(s < 0) { + null + } else { + var key = buffer.duplicate + key.position(KeyOffset) + key = key.slice() + key.limit(s) + key.rewind() + key + } } + + /** + * Returns true if the crc stored with the message matches the crc computed off the message contents + */ + def isValid: Boolean = checksum == computeChecksum override def toString(): String = - "message(magic = %d, attributes = %d, crc = %d, payload = %s)".format(magic, attributes, checksum, payload) + "Message(magic = %d, attributes = %d, crc = %d, key = %s, payload = %s)".format(magic, attributes, checksum, key, payload) override def equals(any: Any): Boolean = { any match { - case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum && - payload == that.payload && magic == that.magic + case that: Message => this.buffer.equals(that.buffer) case _ => false } } diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala index d769fc6..dcf00bb 100644 --- a/core/src/main/scala/kafka/message/MessageAndOffset.scala +++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala @@ -18,5 +18,11 @@ package kafka.message -case class MessageAndOffset(message: Message, offset: Long) +case class MessageAndOffset(message: Message, offset: Long) { + + /** + * Compute the offset of the next message in the log + */ + def nextOffset: Long = offset + MessageSet.entrySize(message) +} diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index b53c60b..c47bf48 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -24,9 +24,11 @@ import java.nio.channels._ * Message set helper functions */ object MessageSet { - - val LogOverhead = 4 - val Empty: MessageSet = new ByteBufferMessageSet(ByteBuffer.allocate(0)) + + val MessageSizeLength = 4 + val OffsetLength = 8 + val LogOverhead = MessageSizeLength + OffsetLength + val Empty = new ByteBufferMessageSet(ByteBuffer.allocate(0)) /** * The size of a message set containing the given messages @@ -52,37 +54,15 @@ object MessageSet { */ def entrySize(message: Message): Int = LogOverhead + message.size - def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = - compressionCodec match { - case NoCompressionCodec => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for (message <- messages) { - message.serializeTo(buffer) - } - buffer.rewind - buffer - case _ => - messages.size match { - case 0 => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - buffer.rewind - buffer - case _ => - val message = CompressionUtils.compress(messages, compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer - } - } } /** - * A set of messages. A message set has a fixed serialized form, though the container + * A set of messages with offsets. A message set has a fixed serialized form, though the container * for the bytes could be either in-memory or on disk. A The format of each message is * as follows: + * 8 byte message offset number * 4 byte size containing an integer N - * N message bytes as described in the message class + * N message bytes as described in the Message class */ abstract class MessageSet extends Iterable[MessageAndOffset] { @@ -92,7 +72,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long /** - * Provides an iterator over the messages in this set + * Provides an iterator over the message/offset pairs in this set */ def iterator: Iterator[MessageAndOffset] @@ -110,5 +90,19 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { if(!messageAndOffset.message.isValid) throw new InvalidMessageException } + + /** + * Print this message set's contents + */ + override def toString: String = { + val builder = new StringBuilder() + builder.append(getClass.getSimpleName + "(") + for(message <- this) { + builder.append(message) + builder.append(", ") + } + builder.append(")") + builder.toString + } } diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 589923c..880f026 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -88,7 +88,8 @@ object ConsoleProducer { val props = new Properties() props.put("broker.list", brokerList) - props.put("compression.codec", DefaultCompressionCodec.codec.toString) + if(compress) + props.put("compression.codec", DefaultCompressionCodec.codec.toString) props.put("producer.type", if(sync) "sync" else "async") if(options.has(batchSizeOpt)) props.put("batch.size", batchSize.toString) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index ed25962..9fd460d 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { - private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map + private val fetchMap = new mutable.HashMap[(String, Int), Long] // a (topic, partitionId) -> offset map private val fetchMapLock = new Object val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) @@ -65,7 +65,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket minBytes(minBytes) fetchMapLock synchronized { - for ( ((topic, partitionId), offset) <- fetchMap ) + for(((topic, partitionId), offset) <- fetchMap) builder.addFetch(topic, partitionId, offset.longValue, fetchSize) } @@ -90,8 +90,8 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket if (response != null) { // process fetched data fetchMapLock synchronized { - for ( topicData <- response.data ) { - for ( partitionData <- topicData.partitionDataArray) { + for(topicData <- response.data) { + for(partitionData <- topicData.partitionDataArray) { val topic = topicData.topic val partitionId = partitionData.partition val key = (topic, partitionId) @@ -121,7 +121,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket } } } - if (partitionsWithError.size > 0) { + if(partitionsWithError.size > 0) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index e9df147..8f7913a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -274,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel, totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { case e: UnknownTopicOrPartitionException => - info("Invalid partition %d in fetch request from client %d." + info("Invalid partition %d in fetch request from client %s." .format(offsetDetail.partitions(i), fetchRequest.clientId)) case e => error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" @@ -309,7 +309,7 @@ class KafkaApis(val requestChannel: RequestChannel, val info = new mutable.ArrayBuffer[PartitionData]() val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) - for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { + for((partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_))) { val isFetchFromFollower = fetchRequest.isFromFollower() val partitionInfo = try { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 7c8ff4e..51bfe5e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -34,7 +34,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] if (fetchOffset != replica.logEndOffset) - throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset)) + throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 85ae0c5..d837499 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -43,7 +43,7 @@ object DumpLogSegments { " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec) if (!isNoPrint) println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset } println("tail of the log is at offset: " + (startOffset + offset)) } diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index dc04180..f5f9dde 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -108,7 +108,7 @@ object SimpleConsumerShell extends Logging { for(messageAndOffset <- messageSet) { if(printMessages) info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) info("next offset = " + offset) consumed += 1 diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 06d965a..6257ab9 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -295,11 +295,12 @@ object ZkUtils extends Logging { def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { val stat = client.writeData(path, data, expectVersion) - info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion)) + debug("Conditional update to the zookeeper path %s with expected version %d succeeded and returned the new version: %d".format(path, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { case e: Exception => - info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion)) + info("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion)) + error(e) (false, -1) } } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala new file mode 100644 index 0000000..3b29107 --- /dev/null +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network; + +import org.junit._ +import org.scalatest.junit.JUnitSuite +import junit.framework.Assert._ +import java.nio.ByteBuffer +import kafka.api._ +import kafka.message.{Message, ByteBufferMessageSet} +import kafka.cluster.Broker +import kafka.common.ErrorMapping +import collection.mutable._ + +object SerializationTestUtils{ + private val topic1 = "test1" + private val topic2 = "test2" + private val leader1 = 0; + private val isr1 = List(0, 1, 2) + private val leader2 = 0; + private val isr2 = List(0, 2, 3) + private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) + private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) + private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) + private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) + private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) + private val topicData1 = new TopicData(topic1, partitionDataArray) + private val topicData2 = new TopicData(topic2, partitionDataArray) + private val topicDataArray = Array(topicData1, topicData2) + private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) + private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) + private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) + private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) + private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) + private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) + + def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = { + val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1) + val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) + val map = Map(((topic1, 0), leaderAndISR1), + ((topic2, 0), leaderAndISR2)) + new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map) + } + + def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { + val responseMap = Map(((topic1, 0), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError)) + new LeaderAndISRResponse(1, responseMap) + } + + def createTestStopReplicaRequest() : StopReplicaRequest = { + new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) + } + + def createTestStopReplicaResponse() : StopReplicaResponse = { + val responseMap = Map(((topic1, 0), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError)) + new StopReplicaResponse(1, responseMap) + } + + def createTestProducerRequest: ProducerRequest = { + new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) + } + + def createTestProducerResponse: ProducerResponse = { + new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) + } + + def createTestFetchRequest: FetchRequest = { + new FetchRequest(offsetInfo = offsetDetailSeq) + } + + def createTestFetchResponse: FetchResponse = { + new FetchResponse(1, 1, topicDataArray) + } + + def createTestOffsetRequest: OffsetRequest = { + new OffsetRequest(topic1, 1, 1000, 200) + } + + def createTestOffsetResponse: OffsetResponse = { + new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l)) + } + + def createTestTopicMetadataRequest: TopicMetadataRequest = { + new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) + } + + def createTestTopicMetadataResponse: TopicMetaDataResponse = { + new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2)) + } +} + +class RequestResponseSerializationTest extends JUnitSuite { + private val leaderAndISRRequest = SerializationTestUtils.createTestLeaderAndISRRequest + private val leaderAndISRResponse = SerializationTestUtils.createTestLeaderAndISRResponse + private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest + private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse + private val producerRequest = SerializationTestUtils.createTestProducerRequest + private val producerResponse = SerializationTestUtils.createTestProducerResponse + private val fetchRequest = SerializationTestUtils.createTestFetchRequest + private val offsetRequest = SerializationTestUtils.createTestOffsetRequest + private val offsetResponse = SerializationTestUtils.createTestOffsetResponse + private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest + private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse + + + @Test + def testSerializationAndDeserialization() { + var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes()) + leaderAndISRRequest.writeTo(buffer) + buffer.rewind() + val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest, + deserializedLeaderAndISRRequest) + + buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes()) + leaderAndISRResponse.writeTo(buffer) + buffer.rewind() + val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse, + deserializedLeaderAndISRResponse) + + buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes()) + stopReplicaRequest.writeTo(buffer) + buffer.rewind() + val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer) + assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest, + deserializedStopReplicaRequest) + + buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes()) + stopReplicaResponse.writeTo(buffer) + buffer.rewind() + val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer) + assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, + deserializedStopReplicaResponse) + + buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) + producerRequest.writeTo(buffer) + buffer.rewind() + val deserializedProducerRequest = ProducerRequest.readFrom(buffer) + assertEquals("The original and deserialzed producerRequest should be the same", producerRequest, + deserializedProducerRequest) + + buffer = ByteBuffer.allocate(producerResponse.sizeInBytes) + producerResponse.writeTo(buffer) + buffer.rewind() + val deserializedProducerResponse = ProducerResponse.readFrom(buffer) + assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse, + deserializedProducerResponse) + + buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes) + fetchRequest.writeTo(buffer) + buffer.rewind() + val deserializedFetchRequest = FetchRequest.readFrom(buffer) + assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest, + deserializedFetchRequest) + + buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes()) + offsetRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetRequest = OffsetRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest, + deserializedOffsetRequest) + + buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes) + offsetResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetResponse = OffsetResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse, + deserializedOffsetResponse) + + buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes()) + topicMetadataRequest.writeTo(buffer) + buffer.rewind() + val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer) + assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest, + deserializedTopicMetadataRequest) + + buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes) + topicMetadataResponse.writeTo(buffer) + buffer.rewind() + val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer) + assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, + deserializedTopicMetadataResponse) + } +} diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 0cfa89c..b52b159 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -348,7 +348,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) assertEquals(nMessages, receivedMessages1.size) - assertEquals(sentMessages1, receivedMessages1) + assertEquals(sentMessages1.sortWith((s,t) => s.checksum < t.checksum), receivedMessages1) } def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = { @@ -395,10 +395,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= { var messages: List[Message] = Nil - for ((topic, messageStreams) <- topicMessageStreams) { + for((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { + for(i <- 0 until nMessagesPerThread) { assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index 950aeaa..4845cbb 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -104,7 +104,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness { // send some invalid offsets val builder = new FetchRequestBuilder() - for( (topic, offset) <- topicOffsets ) + for((topic, offset) <- topicOffsets) builder.addFetch(topic, offset, -1, 10000) val request = builder.build() diff --git a/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala index a9ff224..3f75f16 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala @@ -26,12 +26,12 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC override def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = new ByteBufferMessageSet(compressed, getMessageList(messages: _*)) - + @Test def testEquals() { val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) + messages = getMessageList(new Message("hello".getBytes()), + new Message("there".getBytes()))) val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message("hello".getBytes()), new Message("there".getBytes()))) diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index e23fc8d..b825b27 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -92,16 +92,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10) - val offsets = log.getOffsetsBefore(offsetRequest) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + val offsets = log.getOffsetsBefore(offsetRequest).toList + assertEquals(List(480, 360, 240, 120, 0), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, - OffsetRequest.LatestTime, 10) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) + OffsetRequest.LatestTime, 10).toList + assertEquals(List(480, 360, 240, 120, 0), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -154,13 +153,12 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val now = time.milliseconds val offsetRequest = new OffsetRequest(topic, part, now, 10) - val offsets = log.getOffsetsBefore(offsetRequest) - println("Offsets = " + offsets.mkString(",")) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + val offsets = log.getOffsetsBefore(offsetRequest).toList + assertEquals(List(480L, 360L, 240L, 120L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) + val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10).toList + assertEquals(List(480L, 360L, 240L, 120L, 0L), consumerOffsets) } @Test @@ -181,14 +179,13 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.EarliestTime, 10) - val offsets = log.getOffsetsBefore(offsetRequest) - - assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + val offsets = log.getOffsetsBefore(offsetRequest).toList + assertEquals(List(0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, - OffsetRequest.EarliestTime, 10) - assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + OffsetRequest.EarliestTime, 10).toList + assertEquals(List(0L), consumerOffsets) } private def createBrokerConfig(nodeId: Int, port: Int): Properties = { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index bdb46d6..1bbd41f 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -166,7 +166,7 @@ class LogTest extends JUnitSuite { log.flush /* now do successive reads and iterate over the resulting message sets counting the messages - * we should find exact 100 messages. + * we should find exactly 100 messages. */ var reads = 0 var current = 0 @@ -174,7 +174,7 @@ class LogTest extends JUnitSuite { var readOffset = 0L while(current < numMessages) { val messages = log.read(readOffset, 1024*1024) - readOffset += messages.last.offset + readOffset = messages.last.nextOffset current += messages.size if(reads > 2*numMessages) fail("Too many read attempts.") diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index b098b02..a3001da 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -27,24 +27,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, messages: _*) - - @Test - def testSmallFetchSize() { - // create a ByteBufferMessageSet that doesn't contain a full message - // iterating it should get an InvalidMessageSizeException - val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes())) - val buffer = messages.buffer.slice - buffer.limit(10) - val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000) - try { - for (message <- messageSetWithNoFullMessage) - fail("shouldn't see any message") - } - catch { - case e: InvalidMessageSizeException => //this is expected - case e2 => fail("shouldn't see any other exceptions") - } - } @Test def testValidBytes() { @@ -104,8 +86,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) - //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit) //make sure shallow iterator is the same as deep iterator TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator), @@ -118,9 +98,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) - //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit) - verifyShallowIterator(messageSet) } @@ -137,9 +114,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) - //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit) - //make sure shallow iterator is the same as deep iterator TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator), TestUtils.getMessageIterator(mixedMessageSet.iterator)) @@ -158,17 +132,35 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) - //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit) - verifyShallowIterator(mixedMessageSet) } } + + @Test + def testOffsetAssignment() { + val messages = new ByteBufferMessageSet(NoCompressionCodec, + new Message("hello".getBytes), + new Message("there".getBytes), + new Message("beautiful".getBytes)) + checkOffsets(messages, 0) + var offset = 1234567 + messages.reassignOffsets(offset) + checkOffsets(messages, offset) + } + + /* check that offsets are assigned based on byte offset from the given base offset */ + def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) { + var offset = baseOffset + for(entry <- messages) { + assertEquals("Unexpected offset in message set iterator", offset, entry.offset) + offset += MessageSet.entrySize(entry.message) + } + } def verifyShallowIterator(messageSet: ByteBufferMessageSet) { - //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator - val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet - val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet - assertTrue(shallowOffsets.subsetOf(deepOffsets)) + //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator + val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet + val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet + assertTrue(shallowOffsets.subsetOf(deepOffsets)) } } diff --git a/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala b/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala index df96603..fcbdd33 100644 --- a/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala @@ -17,59 +17,52 @@ package kafka.message +import java.io.ByteArrayOutputStream import kafka.utils.TestUtils import org.scalatest.junit.JUnitSuite import org.junit.Test import junit.framework.Assert._ class CompressionUtilTest extends JUnitSuite { - @Test def testSimpleCompressDecompress() { - val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - - val message = CompressionUtils.compress(messages) - + val message = CompressionUtils.compress(0, messages) val decompressedMessages = CompressionUtils.decompress(message) - TestUtils.checkLength(decompressedMessages.iterator,3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) } @Test def testComplexCompressDecompress() { - val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - - val message = CompressionUtils.compress(messages.slice(0, 2)) - + val message = CompressionUtils.compress(0, messages.slice(0, 2)) val complexMessages = List[Message](message):::messages.slice(2,3) - - val complexMessage = CompressionUtils.compress(complexMessages) - + val complexMessage = CompressionUtils.compress(0, complexMessages) val decompressedMessages = CompressionUtils.decompress(complexMessage) - TestUtils.checkLength(TestUtils.getMessageIterator(decompressedMessages.iterator),3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) } @Test def testSnappyCompressDecompressExplicit() { - + if(!isSnappyAvailable()) + return val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes)) - - val message = CompressionUtils.compress(messages,SnappyCompressionCodec) - + val message = CompressionUtils.compress(0L, messages, SnappyCompressionCodec) assertEquals(message.compressionCodec,SnappyCompressionCodec) - val decompressedMessages = CompressionUtils.decompress(message) - TestUtils.checkLength(decompressedMessages.iterator,3) - TestUtils.checkEquals(messages.iterator, TestUtils.getMessageIterator(decompressedMessages.iterator)) } + + def isSnappyAvailable(): Boolean = { + try { + val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream()) + true + } catch { + case e: UnsatisfiedLinkError => false + } + } } diff --git a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala index a683963..f9a277a 100644 --- a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala @@ -76,8 +76,8 @@ class FileMessageSetTest extends BaseMessageSetTestCases { val read = messageSet.read(0, messageSet.sizeInBytes) checkEquals(messageSet.iterator, read.iterator) val items = read.iterator.toList - val first = items.head - val read2 = messageSet.read(first.offset, messageSet.sizeInBytes) + val sec = items.tail.head + val read2 = messageSet.read(sec.offset, messageSet.sizeInBytes) checkEquals(items.tail.iterator, read2.iterator) } diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 4e3184c..2328721 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -22,32 +22,40 @@ import java.nio._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{Before, Test} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters import kafka.utils.TestUtils +import kafka.utils.Utils -class MessageTest extends JUnitSuite { - - var message: Message = null - val payload = "some bytes".getBytes() +@RunWith(value = classOf[Parameterized]) +class MessageTest(val key: Array[Byte], val payload: Array[Byte], val codec: CompressionCodec) extends JUnitSuite { + var message: Message = null + @Before def setUp(): Unit = { - message = new Message(payload) + message = new Message(bytes = payload, key=key, compressionCodec = codec) } @Test def testFieldValues = { TestUtils.checkEquals(ByteBuffer.wrap(payload), message.payload) assertEquals(Message.CurrentMagicValue, message.magic) - assertEquals(69L, new Message(69, "hello".getBytes()).checksum) + if(message.hasKey) + TestUtils.checkEquals(ByteBuffer.wrap(key), message.key) + else + assertEquals(null, message.key) + assertEquals(codec, message.compressionCodec) } @Test def testChecksum() { assertTrue("Auto-computed checksum should be valid", message.isValid) - val badChecksum = message.checksum + 1 % Int.MaxValue - val invalid = new Message(badChecksum, payload) - assertEquals("Message should return written checksum", badChecksum, invalid.checksum) - assertFalse("Message with invalid checksum should be invalid", invalid.isValid) + // garble checksum + val badChecksum: Int = (message.checksum + 1 % Int.MaxValue).toInt + Utils.putUnsignedInt(message.buffer, Message.CrcOffset, badChecksum) + assertFalse("Message with invalid checksum should be invalid", message.isValid) } @Test @@ -55,7 +63,7 @@ class MessageTest extends JUnitSuite { assertFalse("Should not equal null", message.equals(null)) assertFalse("Should not equal a random string", message.equals("asdf")) assertTrue("Should equal itself", message.equals(message)) - val copy = new Message(message.checksum, payload) + val copy = new Message(bytes = payload, key = key, compressionCodec = codec) assertTrue("Should equal another message with the same content.", message.equals(copy)) } @@ -68,3 +76,17 @@ class MessageTest extends JUnitSuite { } } + +object MessageTest { + @Parameters + def parameters: java.util.Collection[Array[AnyRef]] = { + val l = new java.util.ArrayList[Array[AnyRef]]() + val keys = Array(null, "key".getBytes, "".getBytes) + val vals = Array("value".getBytes, "".getBytes) + val codecs = Array(NoCompressionCodec, GZIPCompressionCodec) + for(k <- keys; v <- vals; codec <- codecs) + l.add(Array(k, v, codec)) + l + } +} + diff --git a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala deleted file mode 100644 index d0b187e..0000000 --- a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network; - -import org.junit._ -import org.scalatest.junit.JUnitSuite -import junit.framework.Assert._ -import java.nio.ByteBuffer -import kafka.api._ -import kafka.message.{Message, ByteBufferMessageSet} -import kafka.cluster.Broker -import kafka.common.ErrorMapping -import collection.mutable._ - -object RpcDataSerializationTestUtils{ - private val topic1 = "test1" - private val topic2 = "test2" - private val leader1 = 0; - private val isr1 = List(0, 1, 2) - private val leader2 = 0; - private val isr2 = List(0, 2, 3) - private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) - private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) - private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) - private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) - private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) - private val topicData1 = new TopicData(topic1, partitionDataArray) - private val topicData2 = new TopicData(topic2, partitionDataArray) - private val topicDataArray = Array(topicData1, topicData2) - private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) - private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) - private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) - private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) - - def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = { - val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1) - val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) - val map = Map(((topic1, 0), leaderAndISR1), - ((topic2, 0), leaderAndISR2)) - new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map) - } - - def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { - val responseMap = Map(((topic1, 0), ErrorMapping.NoError), - ((topic2, 0), ErrorMapping.NoError)) - new LeaderAndISRResponse(1, responseMap) - } - - def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) - } - - def createTestStopReplicaResponse() : StopReplicaResponse = { - val responseMap = Map(((topic1, 0), ErrorMapping.NoError), - ((topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(1, responseMap) - } - - def createTestProducerRequest: ProducerRequest = { - new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) - } - - def createTestProducerResponse: ProducerResponse = { - new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) - } - - def createTestFetchRequest: FetchRequest = { - new FetchRequest(offsetInfo = offsetDetailSeq) - } - - def createTestFetchResponse: FetchResponse = { - new FetchResponse(1, 1, topicDataArray) - } - - def createTestOffsetRequest: OffsetRequest = { - new OffsetRequest(topic1, 1, 1000, 200) - } - - def createTestOffsetResponse: OffsetResponse = { - new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l)) - } - - def createTestTopicMetadataRequest: TopicMetadataRequest = { - new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) - } - - def createTestTopicMetadataResponse: TopicMetaDataResponse = { - new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2)) - } -} - -class RpcDataSerializationTest extends JUnitSuite { - private val leaderAndISRRequest = RpcDataSerializationTestUtils.createTestLeaderAndISRRequest - private val leaderAndISRResponse = RpcDataSerializationTestUtils.createTestLeaderAndISRResponse - private val stopReplicaRequest = RpcDataSerializationTestUtils.createTestStopReplicaRequest - private val stopReplicaResponse = RpcDataSerializationTestUtils.createTestStopReplicaResponse - private val producerRequest = RpcDataSerializationTestUtils.createTestProducerRequest - private val producerResponse = RpcDataSerializationTestUtils.createTestProducerResponse - private val fetchRequest = RpcDataSerializationTestUtils.createTestFetchRequest - private val offsetRequest = RpcDataSerializationTestUtils.createTestOffsetRequest - private val offsetResponse = RpcDataSerializationTestUtils.createTestOffsetResponse - private val topicMetadataRequest = RpcDataSerializationTestUtils.createTestTopicMetadataRequest - private val topicMetadataResponse = RpcDataSerializationTestUtils.createTestTopicMetadataResponse - - - @Test - def testSerializationAndDeserialization() { - var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes()) - leaderAndISRRequest.writeTo(buffer) - buffer.rewind() - val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer) - assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest, - deserializedLeaderAndISRRequest) - - buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes()) - leaderAndISRResponse.writeTo(buffer) - buffer.rewind() - val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer) - assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse, - deserializedLeaderAndISRResponse) - - buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes()) - stopReplicaRequest.writeTo(buffer) - buffer.rewind() - val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer) - assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest, - deserializedStopReplicaRequest) - - buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes()) - stopReplicaResponse.writeTo(buffer) - buffer.rewind() - val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer) - assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, - deserializedStopReplicaResponse) - - buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) - producerRequest.writeTo(buffer) - buffer.rewind() - val deserializedProducerRequest = ProducerRequest.readFrom(buffer) - assertEquals("The original and deserialzed producerRequest should be the same", producerRequest, - deserializedProducerRequest) - - buffer = ByteBuffer.allocate(producerResponse.sizeInBytes) - producerResponse.writeTo(buffer) - buffer.rewind() - val deserializedProducerResponse = ProducerResponse.readFrom(buffer) - assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse, - deserializedProducerResponse) - - buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes) - fetchRequest.writeTo(buffer) - buffer.rewind() - val deserializedFetchRequest = FetchRequest.readFrom(buffer) - assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest, - deserializedFetchRequest) - - buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes()) - offsetRequest.writeTo(buffer) - buffer.rewind() - val deserializedOffsetRequest = OffsetRequest.readFrom(buffer) - assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest, - deserializedOffsetRequest) - - buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes) - offsetResponse.writeTo(buffer) - buffer.rewind() - val deserializedOffsetResponse = OffsetResponse.readFrom(buffer) - assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse, - deserializedOffsetResponse) - - buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes()) - topicMetadataRequest.writeTo(buffer) - buffer.rewind() - val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer) - assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest, - deserializedTopicMetadataRequest) - - buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes) - topicMetadataResponse.writeTo(buffer) - buffer.rewind() - val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer) - assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, - deserializedTopicMetadataResponse) - } -} diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index b21712d..c840c42 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -19,6 +19,7 @@ package kafka.producer import java.util.{LinkedList, Properties} import java.util.concurrent.LinkedBlockingQueue +import java.io.IOException import junit.framework.Assert._ import org.easymock.EasyMock import org.junit.Test @@ -149,7 +150,7 @@ class AsyncProducerTest extends JUnit3Suite { for (producerData <- producerDataList) queue.put(producerData) - Thread.sleep(queueExpirationTime + 10) + Thread.sleep(queueExpirationTime + 100) EasyMock.verify(mockHandler) producerSendThread.shutdown } @@ -401,7 +402,6 @@ class AsyncProducerTest extends JUnit3Suite { val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata] topicPartitionInfos.put("topic1", topic1Metadata) - val msgs = TestUtils.getMsgStrings(2) // produce request for topic1 and partitions 0 and 1. Let the first request fail