diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 518d2df..c08eab0 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,23 +16,22 @@
*/
package kafka.cluster
-import kafka.common._
+import scala.collection._
import kafka.admin.AdminUtils
-import kafka.utils.{ZkUtils, Pool, Time, Logging}
-import kafka.utils.Utils.inLock
+import kafka.utils._
+import java.lang.Object
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{OffsetManager, ReplicaManager}
+import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
+import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
-
+import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping}
import java.io.IOException
-import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.Some
-import scala.collection._
-
-import com.yammer.metrics.core.Gauge
+import kafka.common.TopicAndPartition
/**
@@ -49,7 +48,7 @@ class Partition(val topic: String,
var leaderReplicaIdOpt: Option[Int] = None
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private val assignedReplicaMap = new Pool[Int,Replica]
- private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
+ private val leaderIsrUpdateLock = new Object
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
@@ -73,7 +72,7 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(_) =>
inSyncReplicas.size < assignedReplicas.size
@@ -115,7 +114,7 @@ class Partition(val topic: String,
}
def leaderReplicaIfLocal(): Option[Replica] = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
leaderReplicaIdOpt match {
case Some(leaderReplicaId) =>
if (leaderReplicaId == localBrokerId)
@@ -141,7 +140,7 @@ class Partition(val topic: String,
def delete() {
// need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
- inLock(leaderIsrUpdateLock.writeLock()) {
+ leaderIsrUpdateLock synchronized {
assignedReplicaMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
@@ -156,7 +155,7 @@ class Partition(val topic: String,
}
def getLeaderEpoch(): Int = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
return this.leaderEpoch
}
}
@@ -168,7 +167,7 @@ class Partition(val topic: String,
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int,
offsetManager: OffsetManager): Boolean = {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -201,7 +200,7 @@ class Partition(val topic: String,
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int, offsetManager: OffsetManager): Boolean = {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -235,7 +234,7 @@ class Partition(val topic: String,
}
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ leaderIsrUpdateLock synchronized {
debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
val replicaOpt = getReplica(replicaId)
if(!replicaOpt.isDefined) {
@@ -271,7 +270,7 @@ class Partition(val topic: String,
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(_) =>
val numAcks = inSyncReplicas.count(r => {
@@ -315,7 +314,7 @@ class Partition(val topic: String,
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
@@ -357,7 +356,7 @@ class Partition(val topic: String,
}
def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
@@ -403,7 +402,7 @@ class Partition(val topic: String,
}
override def toString(): String = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ leaderIsrUpdateLock synchronized {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f20c232..46df8d9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,18 +17,15 @@
package kafka.log
-import kafka.utils._
-import kafka.message._
-import kafka.common._
-import kafka.metrics.KafkaMetricsGroup
-import kafka.server.BrokerTopicStats
-
import java.io.{IOException, File}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
-import java.text.NumberFormat
+import kafka.utils._
import scala.collection.JavaConversions
-
+import java.text.NumberFormat
+import kafka.message._
+import kafka.common._
+import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
@@ -238,7 +235,7 @@ class Log(val dir: File,
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
- var validMessages = trimInvalidBytes(messages, appendInfo)
+ var validMessages = trimInvalidBytes(messages)
try {
// they are valid, insert them in the log
@@ -249,7 +246,7 @@ class Log(val dir: File,
val segment = maybeRoll()
if(assignOffsets) {
- // assign offsets to the message set
+ // assign offsets to the messageset
val offset = new AtomicLong(nextOffset.get)
try {
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
@@ -263,16 +260,12 @@ class Log(val dir: File,
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
- // re-validate message sizes since after re-compression some may exceed the limit
+ // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
+ // happens with the new message size (after re-compression, if any)
for(messageAndOffset <- validMessages.shallowIterator) {
- if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
- // we record the original message set size instead of trimmed size
- // to be consistent with pre-compression bytesRejectedRate recording
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+ if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize)
throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
- }
}
// now append to the log
@@ -294,22 +287,18 @@ class Log(val dir: File,
}
}
- /**
- * Struct to hold various quantities we compute about each message set before appending to the log
+ /** Struct to hold various quantities we compute about each message set before appending to the log
* @param firstOffset The first offset in the message set
* @param lastOffset The last offset in the message set
- * @param shallowCount The number of shallow messages
- * @param validBytes The number of valid bytes
* @param codec The codec used in the message set
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
- case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
+ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean)
/**
* Validate the following:
*
* - each message matches its CRC
- *
- each message size is valid
*
*
* Also compute the following quantities:
@@ -317,14 +306,12 @@ class Log(val dir: File,
* First offset in the message set
* Last offset in the message set
* Number of messages
- * Number of valid bytes
* Whether the offsets are monotonically increasing
* Whether any compression codec is used (if many are used, then the last one is given)
*
*/
private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
- var shallowMessageCount = 0
- var validBytesCount = 0
+ var messageCount = 0
var firstOffset, lastOffset = -1L
var codec: CompressionCodec = NoCompressionCodec
var monotonic = true
@@ -338,38 +325,25 @@ class Log(val dir: File,
// update the last offset seen
lastOffset = messageAndOffset.offset
- val m = messageAndOffset.message
-
- // Check if the message sizes are valid.
- val messageSize = MessageSet.entrySize(m)
- if(messageSize > config.maxMessageSize) {
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
- .format(messageSize, config.maxMessageSize))
- }
-
// check the validity of the message by checking CRC
+ val m = messageAndOffset.message
m.ensureValid()
-
- shallowMessageCount += 1
- validBytesCount += messageSize
+ messageCount += 1;
val messageCodec = m.compressionCodec
if(messageCodec != NoCompressionCodec)
codec = messageCodec
}
- LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
}
/**
* Trim any invalid bytes from the end of this message set (if there are any)
* @param messages The message set to trim
- * @param info The general information of the message set
* @return A trimmed message set. This may be the same as what was passed in or it may not.
*/
- private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
- val messageSetValidBytes = info.validBytes
+ private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
+ val messageSetValidBytes = messages.validBytes
if(messageSetValidBytes < 0)
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
if(messageSetValidBytes == messages.sizeInBytes) {
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index f1b8432..a1b5c63 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -80,7 +80,17 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
* Gives the total size of this message set in bytes
*/
def sizeInBytes: Int
-
+
+ /**
+ * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't
+ * match the payload for any message.
+ */
+ def validate(): Unit = {
+ for(messageAndOffset <- this)
+ if(!messageAndOffset.message.isValid)
+ throw new InvalidMessageException
+ }
+
/**
* Print this message set's contents. If the message set has more than 100 messages, just
* print the first 100.
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b668f2..bb0359d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,21 +17,19 @@
package kafka.server
+import kafka.admin.AdminUtils
import kafka.api._
-import kafka.common._
-import kafka.log._
import kafka.message._
import kafka.network._
-import kafka.admin.AdminUtils
+import kafka.log._
+import scala.collection._
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
+import kafka.common._
+import kafka.utils.{Pool, SystemTime, Logging}
import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
-import kafka.utils.{Pool, SystemTime, Logging}
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
-import scala.collection._
-
import org.I0Itec.zkclient.ZkClient
/**
@@ -286,6 +284,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) =>
+ // update stats for incoming bytes rate
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
+
try {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info =
@@ -295,12 +297,11 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(topicAndPartition, brokerId))
}
-
val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
- // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
+ // update stats for successfully appended messages
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..f11f6e2 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -97,7 +97,7 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS)
val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS)
val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS)
- val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec", "bytes", TimeUnit.SECONDS)
+ val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec", "bytes", TimeUnit.SECONDS)
val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS)
val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
}
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 7125ec9..95303e0 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,22 +17,25 @@
package kafka.integration
-import junit.framework.Assert._
import kafka.utils.{ZKGroupTopicDirs, Logging}
import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
import kafka.server._
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage}
+import org.junit.Test
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+ val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
- val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1
@@ -51,69 +54,39 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
super.tearDown
}
- // fake test so that this test can pass
- def testResetToEarliestWhenOffsetTooHigh() =
- assertTrue(true)
-
- /* Temporarily disable those tests due to failures.
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64)
-
+ @Test
def testResetToEarliestWhenOffsetTooHigh() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset))
-
+
+ @Test
def testResetToEarliestWhenOffsetTooLow() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
+ @Test
def testResetToLatestWhenOffsetTooHigh() =
assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
+ @Test
def testResetToLatestWhenOffsetTooLow() =
assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
- */
/* Produce the given number of messages, create a consumer with the given offset policy,
* then reset the offset to the given value and consume until we get no new messages.
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+ TestUtils.createTopic(zkClient, topic, 1, 1, servers)
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(), new StringEncoder())
+ val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+ TestUtils.getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
for(i <- 0 until numMessages)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
- TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
-
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
- var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+ val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("auto.offset.reset", resetTo)
consumerProps.put("consumer.timeout.ms", "2000")
consumerProps.put("fetch.wait.max.ms", "0")
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 4075068..25845ab 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -82,9 +82,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(),
- new StringEncoder())
+ val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+ TestUtils.getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms
producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 731ee59..108c2e7 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -19,8 +19,10 @@ package kafka.integration
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite
-import kafka.producer.{ProducerConfig, Producer}
-import kafka.utils.TestUtils
+import kafka.producer.Producer
+import kafka.utils.{StaticPartitioner, TestUtils}
+import kafka.serializer.StringEncoder
+
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
@@ -29,8 +31,10 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
override def setUp() {
super.setUp
- val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
- producer = new Producer(new ProducerConfig(props))
+ producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName,
+ partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
}
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index d1d969e..f44568c 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -251,10 +251,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
}
private def produceMessage(topic: String, message: String) = {
- val props = new Properties()
- props.put("request.required.acks", String.valueOf(-1))
- val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(), new StringEncoder(), props)
+ val producer: Producer[String, Array[Byte]] = createProducer(
+ getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
producer.close()
}
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 16e7164..20e8efe 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,22 +17,24 @@
package kafka.javaapi.consumer
-import junit.framework.Assert._
-import kafka.integration.KafkaServerTestHarness
import kafka.server._
-import org.scalatest.junit.JUnit3Suite
-import scala.collection.JavaConversions
-import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer._
+import kafka.integration.KafkaServerTestHarness
import kafka.producer.KeyedMessage
import kafka.javaapi.producer.Producer
import kafka.utils.IntEncoder
-import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.zk.ZooKeeperTestHarness
+import scala.collection.JavaConversions
+
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.{Level, Logger}
+import junit.framework.Assert._
+
+
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val zookeeperConnect = zkConnect
@@ -52,14 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
- var actualMessages: List[Message] = Nil
+
+ // create the topic
+ TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
-
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -79,7 +80,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
compressed: CompressionCodec): List[String] = {
var messages: List[String] = Nil
val producer: kafka.producer.Producer[Int, String] =
- TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder())
+ TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7a0ef6f..4c45648 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -16,15 +16,18 @@
*/
package kafka.server
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Assert._
-import java.io.File
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.common._
-import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
+import kafka.producer.{KeyedMessage, Producer}
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -50,29 +53,33 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
- producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
- producerProps.put("request.required.acks", "-1")
-
- override def tearDown() {
- for(server <- servers) {
- server.shutdown()
- Utils.rm(server.config.logDirs(0))
- }
- super.tearDown()
- }
+ override def setUp() {
+ super.setUp()
- def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
server1 = TestUtils.createServer(configProps1)
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
// create topic with 1 partition, 2 replicas, one on each broker
createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+ // create the producer
+ producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
+ }
+
+ override def tearDown() {
+ producer.close()
+ for(server <- servers) {
+ server.shutdown()
+ Utils.rm(server.config.logDirs(0))
+ }
+ super.tearDown()
+ }
+
+ def testHWCheckpointNoFailuresSingleLogSegment {
val numMessages = 2L
sendMessages(numMessages.toInt)
@@ -82,7 +89,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000))
servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
- producer.close()
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -90,15 +96,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointWithFailuresSingleLogSegment {
- // start both servers
- server1 = TestUtils.createServer(configProps1)
- server2 = TestUtils.createServer(configProps2)
- servers ++= List(server1, server2)
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
@@ -139,33 +137,18 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
- // start both servers
- server1 = TestUtils.createServer(configs.head)
- server2 = TestUtils.createServer(configs.last)
- servers ++= List(server1, server2)
-
- hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
- hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
sendMessages(20)
- var hw = 20L
+ val hw = 20L
// give some time for follower 1 to record leader HW of 600
assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -173,19 +156,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointWithFailuresMultipleLogSegments {
- // start both servers
- server1 = TestUtils.createServer(configs.head)
- server2 = TestUtils.createServer(configs.last)
- servers ++= List(server1, server2)
-
- hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
- hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)),
- servers = servers)(0)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
sendMessages(2)
var hw = 2L
@@ -220,7 +191,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 5000))
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 481a400..eb0ade3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -54,9 +54,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// send test messages to leader
- val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
- new StringEncoder(),
- new StringEncoder())
+ val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName)
val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
producer.send(messages:_*)
producer.close()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index addd11a..014e964 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,18 +16,20 @@
*/
package kafka.server
-import java.io.File
-import kafka.consumer.SimpleConsumer
-import org.junit.Test
-import junit.framework.Assert._
-import kafka.message.ByteBufferMessageSet
-import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
import kafka.producer._
-import kafka.utils.IntEncoder
+import kafka.utils.{IntEncoder, TestUtils, Utils}
import kafka.utils.TestUtils._
import kafka.api.FetchRequestBuilder
-import kafka.utils.{TestUtils, Utils}
+import kafka.message.ByteBufferMessageSet
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
@@ -43,9 +45,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
- val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)))
- producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
- var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+ var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
// create topic
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
@@ -69,7 +71,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
// wait for the broker to receive the update metadata request after startup
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
- producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+ producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
var fetchedMessage: ByteBufferMessageSet = null
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 130b6be..2939953 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -337,27 +337,33 @@ object TestUtils extends Logging {
* Create a producer for the given host and port
*/
def createProducer[K, V](brokerList: String,
- encoder: Encoder[V] = new DefaultEncoder(),
- keyEncoder: Encoder[K] = new DefaultEncoder(),
- props: Properties = new Properties()): Producer[K, V] = {
- props.put("metadata.broker.list", brokerList)
- props.put("send.buffer.bytes", "65536")
- props.put("connect.timeout.ms", "100000")
- props.put("reconnect.interval", "10000")
- props.put("serializer.class", encoder.getClass.getCanonicalName)
- props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
+ encoder: String = classOf[DefaultEncoder].getName,
+ keyEncoder: String = classOf[DefaultEncoder].getName,
+ partitioner: String = classOf[DefaultPartitioner].getName,
+ producerProps: Properties = null): Producer[K, V] = {
+ val props: Properties =
+ if (producerProps == null) {
+ getProducerConfig(brokerList)
+ } else {
+ producerProps.put("metadata.broker.list", brokerList)
+ producerProps
+ }
+ props.put("serializer.class", encoder)
+ props.put("key.serializer.class", keyEncoder)
+ props.put("partitioner.class", partitioner)
new Producer[K, V](new ProducerConfig(props))
}
- def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
+ def getProducerConfig(brokerList: String): Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerList)
- props.put("partitioner.class", partitioner)
props.put("message.send.max.retries", "3")
props.put("retry.backoff.ms", "1000")
props.put("request.timeout.ms", "500")
props.put("request.required.acks", "-1")
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
+ props.put("send.buffer.bytes", "65536")
+ props.put("connect.timeout.ms", "100000")
+ props.put("reconnect.interval", "10000")
props
}
@@ -368,7 +374,7 @@ object TestUtils extends Logging {
props.put("port", port.toString)
props.put("request.timeout.ms", "500")
props.put("request.required.acks", "1")
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
+ props.put("serializer.class", classOf[StringEncoder].getName)
props
}