diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..d8f9ce6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -77,9 +77,6 @@ public class NetworkClient implements KafkaClient { /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; - /* the last timestamp when no broker node is available to connect */ - private long lastNoNodeAvailableMs; - public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -97,7 +94,6 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; - this.lastNoNodeAvailableMs = 0; } /** @@ -166,10 +162,7 @@ public class NetworkClient implements KafkaClient { } // should we update our metadata? - long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now; - // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt); + long metadataTimeout = metadata.timeToNextUpdate(now); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); @@ -361,8 +354,6 @@ public class NetworkClient implements KafkaClient { Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); - // mark the timestamp for no node available to connect - this.lastNoNodeAvailableMs = now; return; } @@ -376,7 +367,7 @@ public class NetworkClient implements KafkaClient { this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one - log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); + log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id()); initiateConnect(node, now); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 1d30f9e..4aa5b01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -139,11 +139,4 @@ public final class Metadata { public synchronized long lastUpdate() { return this.lastRefreshMs; } - - /** - * The metadata refresh backoff in ms - */ - public long refreshBackoff() { - return refreshBackoffMs; - } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 4dd2cdf..93f2f1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -13,7 +13,6 @@ package org.apache.kafka.common.network; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; @@ -263,11 +262,7 @@ public class Selector implements Selectable { if (!key.isValid()) close(key); } catch (IOException e) { - InetAddress remoteAddress = null; - Socket socket = channel.socket(); - if (socket != null) - remoteAddress = socket.getInetAddress(); - log.warn("Error in I/O with {}", remoteAddress , e); + log.error("Error in I/O: ", e); close(key); } } diff --git a/config/server.properties b/config/server.properties index 5c0905a..f16c84c 100644 --- a/config/server.properties +++ b/config/server.properties @@ -62,10 +62,6 @@ log.dirs=/tmp/kafka-logs # the brokers. num.partitions=1 -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. -num.recovery.threads.per.data.dir=1 - ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3..d7e4956 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -33,7 +33,6 @@ import util.Random * Helper functions common to clients (producer, consumer, or admin) */ object ClientUtils extends Logging{ - /** * Used by the producer to send a metadata request since it has access to the ProducerConfig * @param topics The topics for which the metadata needs to be fetched diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 134aef9..f2ca856 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,8 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils._ +import kafka.utils.{ReplicationUtils, Pool, Time, Logging} +import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} @@ -28,7 +29,7 @@ import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.utils.Utils.{inReadLock,inWriteLock} +import scala.Some import scala.collection._ import com.yammer.metrics.core.Gauge @@ -72,7 +73,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -114,7 +115,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -140,7 +141,7 @@ class Partition(val topic: String, def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - inWriteLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.writeLock()) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -155,7 +156,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { return this.leaderEpoch } } @@ -167,7 +168,7 @@ class Partition(val topic: String, def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - inWriteLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -200,7 +201,7 @@ class Partition(val topic: String, def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - inWriteLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -234,7 +235,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - inWriteLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.writeLock()) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -270,7 +271,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -314,7 +315,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - inWriteLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.writeLock()) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -356,7 +357,7 @@ class Partition(val topic: String, } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -399,7 +400,7 @@ class Partition(val topic: String, } override def toString(): String = { - inReadLock(leaderIsrUpdateLock) { + inLock(leaderIsrUpdateLock.readLock()) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..9d9b346 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -17,20 +17,20 @@ package kafka.consumer -import org.I0Itec.zkclient.ZkClient -import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} -import kafka.cluster.{Cluster, Broker} -import scala.collection.immutable -import scala.collection.Map -import collection.mutable.HashMap -import scala.collection.mutable +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock + +import kafka.client.ClientUtils +import kafka.cluster.{Broker, Cluster} +import kafka.common.TopicAndPartition +import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset} import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} -import kafka.common.TopicAndPartition -import kafka.client.ClientUtils -import java.util.concurrent.atomic.AtomicInteger +import org.I0Itec.zkclient.ZkClient + +import scala.collection.{immutable, mutable} +import scala.collection.mutable.HashMap /** * Usage: @@ -39,9 +39,10 @@ import java.util.concurrent.atomic.AtomicInteger */ class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, - private val zkClient : ZkClient) + private val zkClient : ZkClient, + private val metricOwner: Any = null) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), - config.clientId, config.numConsumerFetchers) { + config.clientId, config.numConsumerFetchers, metricOwner) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..f85677e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,12 +17,13 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging, Utils} -import java.util.concurrent.{TimeUnit, BlockingQueue} -import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference -import kafka.message.{MessageAndOffset, MessageAndMetadata} +import java.util.concurrent.{BlockingQueue, TimeUnit} + import kafka.common.{KafkaException, MessageSizeTooLargeException} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.{IteratorTemplate, Logging} /** @@ -34,13 +35,14 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String, + val metricOwner : Any = null) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L - private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) + private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId, metricOwner) override def next(): MessageAndMetadata[K, V] = { val item = super.next() @@ -49,7 +51,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerTopicStats(topic, metricOwner).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } @@ -110,6 +112,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk current.set(null) } } + } class ConsumerTimeoutException() extends RuntimeException() diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index ff5f470..3a688a4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -23,24 +23,24 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.ClientIdAndTopic @threadsafe -class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) +class ConsumerTopicMetrics(metricId: ClientIdAndTopic, metricOwner : Any) extends KafkaMetricsGroup { + val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) } /** * Tracks metrics for each topic the given consumer client has consumed data from. * @param clientId The clientId of the given consumer client. */ -class ConsumerTopicStats(clientId: String) extends Logging { - private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) - private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) - private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics +class ConsumerTopicStats(clientId: String, metricOwner : Any) extends Logging { + private val valueFactory = ((k: ClientIdAndTopic, metricOwner: Any) => new ConsumerTopicMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndTopic, Any), ConsumerTopicMetrics](Some(valueFactory)) + private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"), metricOwner) // to differentiate from a topic named AllTopics def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats - def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + def getConsumerTopicStats(topic: String, metricOwner: Any): ConsumerTopicMetrics = { + stats.getAndMaybePut((new ClientIdAndTopic(clientId, topic + "-"), metricOwner)) } } @@ -48,10 +48,14 @@ class ConsumerTopicStats(clientId: String) extends Logging { * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map. */ object ConsumerTopicStatsRegistry { - private val valueFactory = (k: String) => new ConsumerTopicStats(k) - private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ConsumerTopicStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ConsumerTopicStats](Some(valueFactory)) - def getConsumerTopicStat(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getConsumerTopicStat(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeConsumerTopicStat(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 875eeeb..4afb57f 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -22,24 +22,24 @@ import kafka.utils.Pool import java.util.concurrent.TimeUnit import kafka.common.ClientIdAndBroker -class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "FetchResponseSize") +class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, metricOwner)) + val requestSizeHist = newHistogram(metricId + "FetchResponseSize", true, metricOwner) } /** * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers. * @param clientId ClientId of the given consumer */ -class FetchRequestAndResponseStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k) - private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) - private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) +class FetchRequestAndResponseStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndBroker, metricOwner: Any) => new FetchRequestAndResponseMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndBroker, Any), FetchRequestAndResponseMetrics](Some(valueFactory)) + private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"), metricOwner) def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + stats.getAndMaybePut(((new ClientIdAndBroker(clientId, brokerInfo + "-")), metricOwner)) } } @@ -47,11 +47,15 @@ class FetchRequestAndResponseStats(clientId: String) { * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map. */ object FetchRequestAndResponseStatsRegistry { - private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k) - private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new FetchRequestAndResponseStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), FetchRequestAndResponseStats](Some(valueFactory)) - def getFetchRequestAndResponseStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getFetchRequestAndResponseStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeFetchRequestAndResponseStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index 805e916..ad919d4 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String, + metricOwner: Any = null) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId, metricOwner) /** * Create an iterator over messages in the stream. @@ -48,4 +49,5 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], override def toString(): String = { "%s kafka stream".format(clientId) } + } diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 9c779ce..d7716e5 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -28,12 +28,13 @@ class PartitionTopicInfo(val topic: String, private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, private val fetchSize: AtomicInteger, - private val clientId: String) extends Logging { + private val clientId: String, + private val metricOwner: Any = null) extends Logging { debug("initial consumer offset of " + this + " is " + consumedOffset.get) debug("initial fetch offset of " + this + " is " + fetchedOffset.get) - private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) + private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId, metricOwner) def getConsumeOffset() = consumedOffset.get @@ -60,7 +61,7 @@ class PartitionTopicInfo(val topic: String, chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) debug("updated fetch offset of (%s) to %d".format(this, next)) - consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size) + consumerTopicStats.getConsumerTopicStats(topic, metricOwner).byteRate.mark(size) consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size) } else if(messages.sizeInBytes > 0) { chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..bfbc27d 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,6 +18,7 @@ package kafka.consumer import kafka.api._ +import kafka.metrics.KafkaMetricsGroup import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} @@ -30,13 +31,13 @@ class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, val bufferSize: Int, - val clientId: String) extends Logging { + val clientId: String) extends Logging with KafkaMetricsGroup { ConsumerConfig.validateClientId(clientId) private val lock = new Object() private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) - private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId, this) private var isClosed = false private def connect(): BlockingChannel = { @@ -57,6 +58,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { + removeAllMetrics(this) + FetchRequestAndResponseStatsRegistry.removeFetchRequestAndResponseStats(clientId, this) disconnect() isClosed = true } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d..1e6569a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -17,28 +17,28 @@ package kafka.consumer +import java.net.InetAddress +import java.util.UUID import java.util.concurrent._ import java.util.concurrent.atomic._ -import locks.ReentrantLock -import collection._ +import java.util.concurrent.locks.ReentrantLock + +import com.yammer.metrics.core.Gauge +import kafka.api._ +import kafka.client.ClientUtils import kafka.cluster._ -import kafka.utils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import java.net.InetAddress -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.UUID -import kafka.serializer._ -import kafka.utils.ZkUtils._ -import kafka.utils.Utils.inLock import kafka.common._ -import com.yammer.metrics.core.Gauge import kafka.metrics._ import kafka.network.BlockingChannel -import kafka.client.ClientUtils -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition +import kafka.serializer._ +import kafka.utils.Utils.inLock +import kafka.utils.ZkUtils._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +import scala.collection._ /** @@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null // useful for tracking migration of consumers to store offsets in kafka - private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) - private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) - private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, this) + private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, this) + private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, this)) val consumerIdString = { var consumerUuid : String = null @@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def createFetcher() { if (enableFetcher) - fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) + fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient, this)) } private def connectZk() { @@ -181,6 +181,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def shutdown() { + removeAllMetrics(this) + ConsumerTopicStatsRegistry.removeConsumerTopicStat(config.clientId, this) val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { info("ZKConsumerConnector shutting down") @@ -228,7 +230,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId, this) (queue, stream) }) ).flatten.toList @@ -511,7 +513,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } class ZKRebalancerListener(val group: String, val consumerIdString: String, - val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) + val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]], + val metricOwner: Any) extends IZkChildListener { private var isWatcherTriggered = false private val lock = new ReentrantLock @@ -819,7 +822,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, consumedOffset, fetchedOffset, new AtomicInteger(config.fetchMessageMaxBytes), - config.clientId) + config.clientId, + metricOwner) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) checkpointedOffsets.put(TopicAndPartition(topic, partition), offset) @@ -836,7 +840,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (loadBalancerListener == null) { val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]], this) } // create listener for session expired event if not exist yet @@ -884,7 +888,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { def value = q.size - } + }, this ) }) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index afbeffc..2faa196 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -28,8 +28,6 @@ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.lang.IllegalStateException -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -175,8 +173,7 @@ class LogCleaner(val config: CleanerConfig, checkDone = checkDone) @volatile var lastStats: CleanerStats = new CleanerStats() - private val backOffWaitLatch = new CountDownLatch(1) - + private def checkDone(topicAndPartition: TopicAndPartition) { if (!isRunning.get()) throw new ThreadShutdownException @@ -190,13 +187,6 @@ class LogCleaner(val config: CleanerConfig, cleanOrSleep() } - - override def shutdown() = { - initiateShutdown() - backOffWaitLatch.countDown() - awaitShutdown() - } - /** * Clean a log if there is a dirty log available, otherwise sleep for a bit */ @@ -204,7 +194,7 @@ class LogCleaner(val config: CleanerConfig, cleanerManager.grabFilthiestLog() match { case None => // there are no cleanable logs, sleep a while - backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS) + time.sleep(config.backOffMs) case Some(cleanable) => // there's a log, clean it var endOffset = cleanable.firstDirtyOffset diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4d2924d..1946c94 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,7 +23,6 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} -import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -40,7 +39,6 @@ class LogManager(val logDirs: Array[File], val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, - ioThreads: Int, val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, @@ -56,7 +54,7 @@ class LogManager(val logDirs: Array[File], createAndValidateLogDirs(logDirs) private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap - loadLogs() + loadLogs(logDirs) private val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) @@ -103,71 +101,36 @@ class LogManager(val logDirs: Array[File], /** * Recover and load all logs in the given data directories */ - private def loadLogs(): Unit = { - info("Loading logs.") - - val threadPools = mutable.ArrayBuffer.empty[ExecutorService] - val jobs = mutable.Map.empty[File, Seq[Future[_]]] - - for (dir <- this.logDirs) { - val pool = Executors.newFixedThreadPool(ioThreads) - threadPools.append(pool) - - val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) - - if (cleanShutdownFile.exists) { - debug( - "Found clean shutdown file. " + - "Skipping recovery for all logs in data directory: " + - dir.getAbsolutePath) - } else { - // log recovery itself is being performed by `Log` class during initialization - brokerState.newState(RecoveringFromUncleanShutdown) - } - + private def loadLogs(dirs: Seq[File]) { + for(dir <- dirs) { val recoveryPoints = this.recoveryPointCheckpoints(dir).read - - val jobsForDir = for { - dirContent <- Option(dir.listFiles).toList - logDir <- dirContent if logDir.isDirectory - } yield { - Utils.runnable { - debug("Loading log '" + logDir.getName + "'") - - val topicPartition = Log.parseTopicPartitionName(logDir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) - - val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) - val previous = this.logs.put(topicPartition, current) - - if (previous != null) { - throw new IllegalArgumentException( - "Duplicate log directories found: %s, %s!".format( - current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + /* load the logs */ + val subDirs = dir.listFiles() + if(subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if(cleanShutDownFile.exists()) + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + else + brokerState.newState(RecoveringFromUncleanShutdown) + + for(dir <- subDirs) { + if(dir.isDirectory) { + info("Loading log '" + dir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(dir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val log = new Log(dir, + config, + recoveryPoints.getOrElse(topicPartition, 0L), + scheduler, + time) + val previous = this.logs.put(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } + cleanShutDownFile.delete() } - - jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq - } - - - try { - for ((cleanShutdownFile, dirJobs) <- jobs) { - dirJobs.foreach(_.get) - cleanShutdownFile.delete() - } - } catch { - case e: ExecutionException => { - error("There was an error in one of the threads during logs loading: " + e.getCause) - throw e.getCause - } - } finally { - threadPools.foreach(_.shutdown()) } - - info("Logs loading complete.") } /** @@ -197,69 +160,31 @@ class LogManager(val logDirs: Array[File], if(cleanerConfig.enableCleaner) cleaner.startup() } - + /** * Close all the logs */ def shutdown() { info("Shutting down.") - - val threadPools = mutable.ArrayBuffer.empty[ExecutorService] - val jobs = mutable.Map.empty[File, Seq[Future[_]]] - - // stop the cleaner first - if (cleaner != null) { - Utils.swallow(cleaner.shutdown()) - } - - // close logs in each dir - for (dir <- this.logDirs) { - debug("Flushing and closing logs at " + dir) - - val pool = Executors.newFixedThreadPool(ioThreads) - threadPools.append(pool) - - val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values - - val jobsForDir = logsInDir map { log => - Utils.runnable { - // flush the log to ensure latest possible recovery point - log.flush() - log.close() - } - } - - jobs(dir) = jobsForDir.map(pool.submit).toSeq - } - - try { - for ((dir, dirJobs) <- jobs) { - dirJobs.foreach(_.get) - - // update the last flush point - debug("Updating recovery points at " + dir) - checkpointLogsInDir(dir) - - // mark that the shutdown was clean by creating marker file - debug("Writing clean shutdown marker at " + dir) - Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) - } - } catch { - case e: ExecutionException => { - error("There was an error in one of the threads during LogManager shutdown: " + e.getCause) - throw e.getCause - } + // stop the cleaner first + if(cleaner != null) + Utils.swallow(cleaner.shutdown()) + // flush the logs to ensure latest possible recovery point + allLogs.foreach(_.flush()) + // close the logs + allLogs.foreach(_.close()) + // update the last flush point + checkpointRecoveryPointOffsets() + // mark that the shutdown was clean by creating the clean shutdown marker file + logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) } finally { - threadPools.foreach(_.shutdown()) // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } - info("Shutdown complete.") } - /** * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * @@ -305,19 +230,14 @@ class LogManager(val logDirs: Array[File], * to avoid recovering the whole log on startup. */ def checkpointRecoveryPointOffsets() { - this.logDirs.foreach(checkpointLogsInDir) - } - - /** - * Make a checkpoint for all logs in provided directory. - */ - private def checkpointLogsInDir(dir: File): Unit = { - val recoveryPoints = this.logsByDir.get(dir.toString) - if (recoveryPoints.isDefined) { - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) + for(dir <- logDirs) { + val recoveryPoints = recoveryPointsByDir.get(dir.toString) + if(recoveryPoints.isDefined) + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) } } - + /** * Get the log if it exists, otherwise return None */ @@ -446,22 +366,13 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values - + /** * Get a map of TopicAndPartition => Log */ def logsByTopicPartition = logs.toMap /** - * Map of log dir to logs by topic and partitions in that dir - */ - private def logsByDir = { - this.logsByTopicPartition.groupBy { - case (_, log) => log.dir.getParent - } - } - - /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ private def flushDirtyLogs() = { diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index a20ab90..e880a4a 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -18,14 +18,17 @@ package kafka.metrics -import com.yammer.metrics.core.{Gauge, MetricName} -import kafka.utils.Logging import java.util.concurrent.TimeUnit + import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.utils.{Logging, Pool} +import scala.collection._ -trait KafkaMetricsGroup extends Logging { +trait KafkaMetricsGroup extends Logging { + val metricRegistry = KafkaMetricsGroup.kafkaMetricRegistry; /** * Creates a new MetricName object for gauges, meters, etc. created for this * metrics group. @@ -39,16 +42,62 @@ trait KafkaMetricsGroup extends Logging { new MetricName(pkg, simpleName, name) } - def newGauge[T](name: String, metric: Gauge[T]) = - Metrics.defaultRegistry().newGauge(metricName(name), metric) + def newGauge[T](name: String, metric: Gauge[T], metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newGauge(metricName(name), metric) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added histogram for " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } + + def newMeter(name: String, eventType: String, timeUnit: TimeUnit, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newMeter(name: String, eventType: String, timeUnit: TimeUnit) = - Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) + def newHistogram(name: String, biased: Boolean = true, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newHistogram(metricName(name), biased) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newHistogram(name: String, biased: Boolean = true) = - Metrics.defaultRegistry().newHistogram(metricName(name), biased) + def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = - Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + def removeMetric(name: String) { + Metrics.defaultRegistry().removeMetric(metricName(name)) + } + def removeAllMetrics(metricOwner : Any) { + metricRegistry.getAndMaybePut(metricOwner).synchronized { + if (metricRegistry.get(metricOwner) == null) { + debug("Metrics for " + metricOwner + " has already been removed.") + } else { + debug("Removing " + metricOwner + " from metricMap, map size = " + metricRegistry.size + "," + metricRegistry.get(metricOwner)) + metricRegistry.get(metricOwner).foreach(metric => Metrics.defaultRegistry().removeMetric(metric)) + metricRegistry.remove(metricOwner) + debug("Removed " + metricOwner + " from metricMap, map size = " + metricRegistry.size) + } + } + } } + +object KafkaMetricsGroup { + val valueFactory = (metricOwner: Any) => new mutable.HashSet[MetricName] + val kafkaMetricRegistry = new Pool[Any, mutable.HashSet[MetricName]](Some(valueFactory)) + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..1ea7e20 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,19 +16,19 @@ */ package kafka.producer -import async.{DefaultEventHandler, ProducerSendThread, EventHandler} -import kafka.utils._ -import java.util.Random -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + import kafka.common.QueueFullException import kafka.metrics._ +import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} +import kafka.serializer.Encoder +import kafka.utils._ class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing - extends Logging { + extends Logging with KafkaMetricsGroup { private val hasShutdown = new AtomicBoolean(false) private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) @@ -50,7 +50,7 @@ class Producer[K,V](val config: ProducerConfig, producerSendThread.start() } - private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) + private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId, this) KafkaMetricsReporter.startReporters(config.props) @@ -126,6 +126,8 @@ class Producer[K,V](val config: ProducerConfig, val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") + removeAllMetrics(this) + ProducerTopicStatsRegistry.removeProducerTopicStats(config.clientId, this) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 9694220..1994952 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -21,24 +21,24 @@ import java.util.concurrent.TimeUnit import kafka.utils.Pool import kafka.common.ClientIdAndBroker -class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "ProducerRequestSize") +class ProducerRequestMetrics(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, metricOwner)) + val requestSizeHist = newHistogram(metricId + "ProducerRequestSize", true, metricOwner) } /** * Tracks metrics of requests made by a given producer client to all brokers. * @param clientId ClientId of the given producer */ -class ProducerRequestStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k) - private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory)) - private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) +class ProducerRequestStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndBroker, metricOwner: Any) => new ProducerRequestMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndBroker, Any), ProducerRequestMetrics](Some(valueFactory)) + private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"), metricOwner) def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + stats.getAndMaybePut((new ClientIdAndBroker(clientId, brokerInfo + "-"), metricOwner)) } } @@ -46,11 +46,15 @@ class ProducerRequestStats(clientId: String) { * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map. */ object ProducerRequestStatsRegistry { - private val valueFactory = (k: String) => new ProducerRequestStats(k) - private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerRequestStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ProducerRequestStats](Some(valueFactory)) - def getProducerRequestStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getProducerRequestStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerRequestStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index e1610d3..f493439 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -20,20 +20,24 @@ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit import kafka.utils.Pool -class ProducerStats(clientId: String) extends KafkaMetricsGroup { - val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) - val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS) - val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) +class ProducerStats(clientId: String, metricOwner: Any) extends KafkaMetricsGroup { + val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, metricOwner) + val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS, metricOwner) + val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, metricOwner) } /** * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map. */ object ProducerStatsRegistry { - private val valueFactory = (k: String) => new ProducerStats(k) - private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerStats(k, metricOwner)).tupled + private val statsRegistry = new Pool[(String, Any), ProducerStats](Some(valueFactory)) - def getProducerStats(clientId: String) = { - statsRegistry.getAndMaybePut(clientId) + def getProducerStats(clientId: String, metricOwner: Any) = { + statsRegistry.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerStats(clientId: String, metricOwner: Any) = { + statsRegistry.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index ed209f4..b757d26 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -23,25 +23,25 @@ import java.util.concurrent.TimeUnit @threadsafe -class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) - val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) +class ProducerTopicMetrics(metricId: ClientIdAndTopic, metricOwner: Any) extends KafkaMetricsGroup { + val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) + val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, metricOwner) } /** * Tracks metrics for each topic the given producer client has produced data to. * @param clientId The clientId of the given producer client. */ -class ProducerTopicStats(clientId: String) { - private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k) - private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory)) - private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics +class ProducerTopicStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndTopic, metricOwner: Any) => new ProducerTopicMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndTopic, Any), ProducerTopicMetrics](Some(valueFactory)) + private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"), metricOwner) // to differentiate from a topic named AllTopics def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats def getProducerTopicStats(topic: String): ProducerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + stats.getAndMaybePut((new ClientIdAndTopic(clientId, topic + "-"), metricOwner)) } } @@ -49,10 +49,14 @@ class ProducerTopicStats(clientId: String) { * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map. */ object ProducerTopicStatsRegistry { - private val valueFactory = (k: String) => new ProducerTopicStats(k) - private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerTopicStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ProducerTopicStats](Some(valueFactory)) - def getProducerTopicStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getProducerTopicStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerTopicStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 489f007..be39f1e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,10 +17,12 @@ package kafka.producer +import java.util.Random + import kafka.api._ +import kafka.metrics.KafkaMetricsGroup import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ -import java.util.Random object SyncProducer { val RequestKey: Short = 0 @@ -31,14 +33,14 @@ object SyncProducer { * Send a message set. */ @threadsafe -class SyncProducer(val config: SyncProducerConfig) extends Logging { +class SyncProducer(val config: SyncProducerConfig) extends Logging with KafkaMetricsGroup { private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.sendBufferBytes, config.requestTimeoutMs) val brokerInfo = "host_%s-port_%s".format(config.host, config.port) - val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) + val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId, this) trace("Instantiating Scala Sync Producer") @@ -116,6 +118,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { def close() = { lock synchronized { disconnect() + removeAllMetrics(this) + ProducerRequestStatsRegistry.removeProducerRequestStats(config.clientId, this) shutdown = true } } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..7cc0763 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -19,6 +19,7 @@ package kafka.producer.async import kafka.common._ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.metrics.KafkaMetricsGroup import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging, SystemTime} @@ -34,7 +35,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) - extends EventHandler[K,V] with Logging { + extends EventHandler[K,V] with Logging with KafkaMetricsGroup { val isSync = ("sync" == config.producerType) val correlationId = new AtomicInteger(0) @@ -45,8 +46,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val topicMetadataToRefresh = Set.empty[String] private val sendPartitionPerTopicCache = HashMap.empty[String, Int] - private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId) - private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) + private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId, this) + private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId, this) def handle(events: Seq[KeyedMessage[K,V]]) { val serializedData = serialize(events) @@ -332,5 +333,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, def close() { if (producerPool != null) producerPool.close + removeAllMetrics(this) + ProducerTopicStatsRegistry.removeProducerTopicStats(config.clientId, this) + ProducerStatsRegistry.removeProducerStats(config.clientId, this) } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 42e9c74..3d6c499 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -37,7 +37,7 @@ class ProducerSendThread[K,V](val threadName: String, newGauge(clientId + "-ProducerQueueSize", new Gauge[Int] { def value = queue.size - }) + }, this) override def run { try { @@ -50,6 +50,7 @@ class ProducerSendThread[K,V](val threadName: String, } def shutdown = { + removeAllMetrics(this) info("Begin shutting down ProducerSendThread") queue.put(shutdownCommand) shutdownLatch.await diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 9390edf..16fd884 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge -abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) +abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1, metricOwner: Any = null) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] @@ -42,7 +42,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: curMaxThread.max(fetcherLagStatsEntry._2.lag) }).max(curMaxAll) }) - } + }, metricOwner ) newGauge( @@ -59,7 +59,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: }) } } - } + }, metricOwner ) private def getFetcherId(topic: String, partitionId: Int) : Int = { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..ea7fbe9 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -40,15 +40,15 @@ import java.util.concurrent.atomic.AtomicLong abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) - extends ShutdownableThread(name, isInterruptible) { + extends ShutdownableThread(name, isInterruptible) with KafkaMetricsGroup { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port) private val metricId = new ClientIdAndBroker(clientId, brokerInfo) - val fetcherStats = new FetcherStats(metricId) - val fetcherLagStats = new FetcherLagStats(metricId) + val fetcherStats = new FetcherStats(metricId, this) + val fetcherLagStats = new FetcherLagStats(metricId, this) val fetchRequestBuilder = new FetchRequestBuilder(). clientId(clientId). replicaId(fetcherBrokerId). @@ -69,6 +69,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def shutdown(){ super.shutdown() + removeAllMetrics(this) simpleConsumer.close() } @@ -203,13 +204,13 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } -class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { +class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition, metricOwner: Any) extends KafkaMetricsGroup { private[this] val lagVal = new AtomicLong(-1L) newGauge( metricId + "-ConsumerLag", new Gauge[Long] { def value = lagVal.get - } + }, metricOwner ) def lag_=(newLag: Long) { @@ -219,18 +220,18 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet def lag = lagVal.get } -class FetcherLagStats(metricId: ClientIdAndBroker) { - private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k) - val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) +class FetcherLagStats(metricId: ClientIdAndBroker, metricOwner: Any) { + private val valueFactory = ((k: ClientIdBrokerTopicPartition, metricOwner: Any) => new FetcherLagMetrics(k, metricOwner)).tupled + val stats = new Pool[(ClientIdBrokerTopicPartition, Any), FetcherLagMetrics](Some(valueFactory)) def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { - stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId)) + stats.getAndMaybePut((new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId), metricOwner)) } } -class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS) +class FetcherStats(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) } case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1a45f87..50b09ed 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -190,9 +190,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) - /* the number of threads per data directory to be used for log recovery at startup and flushing at shutdown */ - val numRecoveryThreadsPerDataDir = props.getIntInRange("num.recovery.threads.per.data.dir", 1, (1, Int.MaxValue)) - /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2871118..def1dc2 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -303,7 +303,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, - ioThreads = config.numRecoveryThreadsPerDataDir, flushCheckMs = config.logFlushSchedulerIntervalMs, flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index bf81a1a..7cd40e1 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -25,6 +25,7 @@ import kafka.utils.Utils._ import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} import kafka.common.TopicAndPartition import kafka.controller.KafkaController.StateChangeLogger +import scala.Some /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through @@ -33,14 +34,14 @@ import kafka.controller.KafkaController.StateChangeLogger private[server] class MetadataCache { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() - private var aliveBrokers: Map[Int, Broker] = Map() + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() private val partitionMetadataLock = new ReentrantReadWriteLock() def getTopicMetadata(topics: Set[String]) = { val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - inReadLock(partitionMetadataLock) { + inLock(partitionMetadataLock.readLock()) { for (topic <- topicsRequested) { if (isAllTopics || cache.contains(topic)) { val partitionStateInfos = cache(topic) @@ -81,15 +82,15 @@ private[server] class MetadataCache { } def getAliveBrokers = { - inReadLock(partitionMetadataLock) { - aliveBrokers.values.toSeq + inLock(partitionMetadataLock.readLock()) { + aliveBrokers.values.toList } } def addOrUpdatePartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) { - inWriteLock(partitionMetadataLock) { + inLock(partitionMetadataLock.writeLock()) { cache.get(topic) match { case Some(infos) => infos.put(partitionId, stateInfo) case None => { @@ -102,7 +103,7 @@ private[server] class MetadataCache { } def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = { - inReadLock(partitionMetadataLock) { + inLock(partitionMetadataLock.readLock()) { cache.get(topic) match { case Some(partitionInfos) => partitionInfos.get(partitionId) case None => None @@ -113,8 +114,8 @@ private[server] class MetadataCache { def updateCache(updateMetadataRequest: UpdateMetadataRequest, brokerId: Int, stateChangeLogger: StateChangeLogger) { - inWriteLock(partitionMetadataLock) { - aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { removePartitionInfo(tp.topic, tp.partition) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 897783c..6a56a77 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -36,9 +36,9 @@ object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -46,6 +46,8 @@ class ReplicaManager(val config: KafkaConfig, @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] + private var leaderPartitions = new mutable.HashSet[Partition]() + private val leaderPartitionsLock = new Object private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) @@ -58,7 +60,9 @@ class ReplicaManager(val config: KafkaConfig, "LeaderCount", new Gauge[Int] { def value = { - getLeaderPartitions().size + leaderPartitionsLock synchronized { + leaderPartitions.size + } } } ) @@ -78,7 +82,9 @@ class ReplicaManager(val config: KafkaConfig, val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) def underReplicatedPartitionCount(): Int = { - getLeaderPartitions().count(_.isUnderReplicated) + leaderPartitionsLock synchronized { + leaderPartitions.count(_.isUnderReplicated) + } } def startHighWaterMarksCheckPointThread() = { @@ -111,6 +117,9 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = ErrorMapping.NoError getPartition(topic, partitionId) match { case Some(partition) => + leaderPartitionsLock synchronized { + leaderPartitions -= partition + } if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) if (removedPartition != null) @@ -322,6 +331,10 @@ class ReplicaManager(val config: KafkaConfig, partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} + // Finally add these partitions to the list of partitions for which the leader is the current broker + leaderPartitionsLock synchronized { + leaderPartitions ++= partitionState.keySet + } } catch { case e: Throwable => partitionState.foreach { state => @@ -370,6 +383,9 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { + leaderPartitionsLock synchronized { + leaderPartitions --= partitionState.keySet + } var partitionsToMakeFollower: Set[Partition] = Set() @@ -448,7 +464,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) + var curLeaderPartitions: List[Partition] = null + leaderPartitionsLock synchronized { + curLeaderPartitions = leaderPartitions.toList + } + curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { @@ -460,9 +480,6 @@ class ReplicaManager(val config: KafkaConfig, } } - private def getLeaderPartitions() : List[Partition] = { - allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList - } /** * Flushes the highwatermark value for all partitions to the highwatermark file */ diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index a75818a..e5b6ff1 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -50,27 +50,9 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, } } - private def getControllerID(): Int = { - readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { - case Some(controller) => KafkaController.parseControllerId(controller) - case None => -1 - } - } - def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) - - leaderId = getControllerID - /* - * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, - * it's possible that the controller has already been elected when we get here. This check will prevent the following - * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. - */ - if(leaderId != -1) { - debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) - return amILeader - } try { createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, @@ -82,13 +64,15 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, } catch { case e: ZkNodeExistsException => // If someone else has written the path, then - leaderId = getControllerID - + leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { + case Some(controller) => KafkaController.parseControllerId(controller) + case None => { + warn("A leader has been elected but just resigned, this will result in another round of election") + -1 + } + } if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - else - warn("A leader has been elected but just resigned, this will result in another round of election") - case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 8e9d47b..6daf87b 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -23,8 +23,7 @@ import kafka.log._ import kafka.utils._ import collection.mutable import joptsimple.OptionParser -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties + object DumpLogSegments { @@ -42,15 +41,7 @@ object DumpLogSegments { .ofType(classOf[java.lang.Integer]) .defaultsTo(5 * 1024 * 1024) val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration") - val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") - .withOptionalArg() - .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") - val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") - .withOptionalArg() - .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") @@ -63,9 +54,6 @@ object DumpLogSegments { val files = options.valueOf(filesOpt).split(",") val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue() val isDeepIteration = if(options.has(deepIterationOpt)) true else false - - val valueDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties) - val keyDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties) val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]] val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]] @@ -74,7 +62,7 @@ object DumpLogSegments { val file = new File(arg) if(file.getName.endsWith(Log.LogFileSuffix)) { println("Dumping " + file) - dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , valueDecoder, keyDecoder) + dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize) } else if(file.getName.endsWith(Log.IndexFileSuffix)) { println("Dumping " + file) dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) @@ -130,9 +118,7 @@ object DumpLogSegments { printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]], isDeepIteration: Boolean, - maxMessageSize: Int, - valueDecoder: Decoder[_], - keyDecoder: Decoder[_]) { + maxMessageSize: Int) { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) val messageSet = new FileMessageSet(file, false) @@ -161,8 +147,8 @@ object DumpLogSegments { print(" keysize: " + msg.keySize) if(printContents) { if(msg.hasKey) - print(" key: " + keyDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.key))) - val payload = if(messageAndOffset.message.isNull) null else valueDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.payload)) + print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8")) + val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8") print(" payload: " + payload) } println() @@ -200,5 +186,4 @@ object DumpLogSegments { } } } - } diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 9a16343..8e37505 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -93,14 +93,16 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) ensureStarted - val runnable = Utils.runnable { - try { - trace("Begining execution of scheduled task '%s'.".format(name)) - fun() - } catch { - case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) - } finally { - trace("Completed execution of scheduled task '%s'.".format(name)) + val runnable = new Runnable { + def run() = { + try { + trace("Begining execution of scheduled task '%s'.".format(name)) + fun() + } catch { + case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) + } finally { + trace("Completed execution of scheduled task '%s'.".format(name)) + } } } if(period >= 0) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index da52b42..6576adf 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio._ import charset.Charset import java.nio.channels._ -import java.util.concurrent.locks.{ReadWriteLock, Lock} +import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ @@ -49,9 +49,9 @@ object Utils extends Logging { * @param fun A function * @return A Runnable that just executes the function */ - def runnable(fun: => Unit): Runnable = - new Runnable { - def run() = fun + def runnable(fun: () => Unit): Runnable = + new Runnable() { + def run() = fun() } /** @@ -540,18 +540,13 @@ object Utils extends Logging { def inLock[T](lock: Lock)(fun: => T): T = { lock.lock() try { - fun + return fun } finally { lock.unlock() } } - def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun) - - def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun) - - - //JSON strings need to be escaped based on ECMA-404 standard http://json.org + //JSON strings need to be escaped based on ECMA-404 standard http://json.org def JSONEscapeString (s : String) : String = { s.map { case '"' => "\\\"" diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 789e74c..15fd5bc 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -21,31 +21,26 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Random +import java.util.{Random, Properties} import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} -import kafka.integration.KafkaServerTestHarness +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{ShutdownableThread, Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import org.apache.kafka.common.KafkaException import org.apache.kafka.clients.producer._ -class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness { - private val producerBufferSize = 30000 - private val serverMessageMaxBytes = producerBufferSize/2 - - val numServers = 2 - val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) - yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val autoCreateTopicsEnable = false - override val messageMaxBytes = serverMessageMaxBytes - } - +class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null @@ -55,19 +50,32 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes private var producer3: KafkaProducer = null private var producer4: KafkaProducer = null + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + props1.put("auto.create.topics.enable", "false") + props2.put("auto.create.topics.enable", "false") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) + + private val bufferSize = 2 * config1.messageMaxBytes + private val topic1 = "topic-1" private val topic2 = "topic-2" override def setUp() { super.setUp() + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize); - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize); + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize) } override def tearDown() { @@ -79,6 +87,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes if (producer3 != null) producer3.close if (producer4 != null) producer4.close + server1.shutdown; Utils.rm(server1.config.logDirs) + server2.shutdown; Utils.rm(server2.config.logDirs) + super.tearDown() } @@ -91,7 +102,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) } @@ -104,7 +115,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) intercept[ExecutionException] { producer2.send(record).get } @@ -138,7 +149,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // producer with incorrect broker list - producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) // send a record with incorrect broker list val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) @@ -164,7 +175,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled - servers.foreach(server => server.requestHandlerPool.shutdown()) + server1.requestHandlerPool.shutdown() + server2.requestHandlerPool.shutdown() producer1.send(record1).get(5000, TimeUnit.MILLISECONDS) @@ -174,11 +186,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // TODO: expose producer configs after creating them // send enough messages to get buffer full - val tooManyRecords = 10 - val msgSize = producerBufferSize / tooManyRecords + val msgSize = 10000 val value = new Array[Byte](msgSize) new Random().nextBytes(value) val record2 = new ProducerRecord(topic1, null, "key".getBytes, value) + val tooManyRecords = bufferSize / ("key".getBytes.length + value.length) intercept[KafkaException] { for (i <- 1 to tooManyRecords) @@ -257,13 +269,17 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // rolling bounce brokers for (i <- 0 until 2) { - for (server <- servers) { - server.shutdown() - server.awaitShutdown() - server.startup + server1.shutdown() + server1.awaitShutdown() + server1.startup - Thread.sleep(2000) - } + Thread.sleep(2000) + + server2.shutdown() + server2.awaitShutdown() + server2.startup + + Thread.sleep(2000) // Make sure the producer do not see any exception // in returned metadata due to broker failures @@ -282,7 +298,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // double check that the leader info has been propagated after consecutive bounces val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) - val fetchResponse = if(leader == configs(0).brokerId) { + val fetchResponse = if(leader == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -301,7 +317,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes var sent = 0 var failed = false - val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10) + val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10) override def doWork(): Unit = { val responses = diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d407af9..34a7db4 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,6 +17,7 @@ package kafka.api.test +import java.util.Properties import java.lang.{Integer, IllegalArgumentException} import org.apache.kafka.clients.producer._ @@ -24,41 +25,53 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message -import kafka.integration.KafkaServerTestHarness -class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { - val numServers = 2 - val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) - yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val numPartitions = 4 - } +class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + props1.put("num.partitions", "4") + props2.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val topic = "topic" private val numRecords = 100 override def setUp() { super.setUp() + // set up 2 brokers with 4 partitions each + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") } override def tearDown() { - consumer1.close() - consumer2.close() - + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) super.tearDown() } @@ -77,7 +90,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testSendOffset() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) val callback = new CheckErrorCallback @@ -133,7 +146,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testClose() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -169,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testSendToPartition() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -196,7 +209,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if(leader1.get == configs(0).brokerId) { + val fetchResponse1 = if(leader1.get == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) @@ -224,7 +237,8 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testAutoCreateTopic() { - var producer = TestUtils.createNewProducer(brokerList, retries = 5) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + retries = 5) try { // Send a message to auto-create the topic diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 3cf7c9b..194dd70 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,13 +30,11 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { val configs: List[KafkaConfig] var servers: List[KafkaServer] = null - var brokerList: String = null override def setUp() { super.setUp if(configs.size <= 0) throw new KafkaException("Must suply at least one server config.") - brokerList = TestUtils.getBrokerListStrFromConfigs(configs) servers = configs.map(TestUtils.createServer(_)) } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7d4c70c..d03d4c4 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -35,11 +35,21 @@ class LogManagerTest extends JUnit3Suite { var logManager: LogManager = null val name = "kafka" val veryLargeLogFlushInterval = 10000000L + val cleanerConfig = CleanerConfig(enableCleaner = false) override def setUp() { super.setUp() logDir = TestUtils.tempDir() - logManager = createLogManager() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time, + brokerState = new BrokerState()) logManager.startup logDir = logManager.logDirs(0) } @@ -115,7 +125,18 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = createLogManager() + logManager = new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = config, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) logManager.startup // create a log @@ -155,7 +176,18 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = createLogManager() + logManager = new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = config, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -177,8 +209,19 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = createLogManager() - + logManager = new LogManager( + logDirs = dirs, + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) + // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { logManager.createLog(TopicAndPartition("test", partition), logConfig) @@ -194,7 +237,18 @@ class LogManagerTest extends JUnit3Suite { @Test def testTwoLogManagersUsingSameDirFails() { try { - createLogManager() + new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good @@ -216,8 +270,16 @@ class LogManagerTest extends JUnit3Suite { def testRecoveryDirectoryMappingWithTrailingSlash() { logManager.shutdown() logDir = TestUtils.tempDir() - logManager = TestUtils.createLogManager( - logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time, + brokerState = new BrokerState()) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -231,7 +293,16 @@ class LogManagerTest extends JUnit3Suite { logDir = new File("data" + File.separator + logDir.getName) logDir.mkdirs() logDir.deleteOnExit() - logManager = createLogManager() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time, + brokerState = new BrokerState()) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -256,12 +327,4 @@ class LogManagerTest extends JUnit3Suite { } } } - - - private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = { - TestUtils.createLogManager( - defaultConfig = logConfig, - logDirs = logDirs, - time = this.time) - } } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e532c28..558a5d6 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -32,11 +32,16 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" - val logManagers = configs map { config => - TestUtils.createLogManager( - logDirs = config.logDirs.map(new File(_)).toArray, - cleanerConfig = CleanerConfig()) - } + val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, + topicConfigs = Map(), + defaultConfig = LogConfig(), + cleanerConfig = CleanerConfig(), + flushCheckMs = 30000, + flushCheckpointMs = 10000L, + retentionCheckMs = 30000, + scheduler = new KafkaScheduler(1), + brokerState = new BrokerState(), + time = new MockTime)) @After def teardown() { @@ -142,4 +147,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9abf219..518d416 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.utils.{MockScheduler, MockTime, TestUtils} +import kafka.log.{CleanerConfig, LogManager, LogConfig} import java.util.concurrent.atomic.AtomicBoolean import java.io.File @@ -36,7 +37,7 @@ class ReplicaManagerTest extends JUnit3Suite { val props = TestUtils.createBrokerConfig(1) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition(topic, 1, 1) @@ -50,11 +51,26 @@ class ReplicaManagerTest extends JUnit3Suite { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition(topic, 1, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } + + private def createLogManager(logDirs: Array[File]): LogManager = { + val time = new MockTime() + return new LogManager(logDirs, + topicConfigs = Map(), + defaultConfig = new LogConfig(), + cleanerConfig = CleanerConfig(enableCleaner = false), + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time) + } + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5..3faa884 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -39,7 +39,6 @@ import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils import kafka.producer.ProducerConfig -import kafka.log._ import junit.framework.AssertionFailedError import junit.framework.Assert._ @@ -386,7 +385,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") return new KafkaProducer(producerProps) } @@ -690,30 +689,6 @@ object TestUtils extends Logging { def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } - - - /** - * Create new LogManager instance with default configuration for testing - */ - def createLogManager( - logDirs: Array[File] = Array.empty[File], - defaultConfig: LogConfig = LogConfig(), - cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), - time: MockTime = new MockTime()) = - { - new LogManager( - logDirs = logDirs, - topicConfigs = Map(), - defaultConfig = defaultConfig, - cleanerConfig = cleanerConfig, - ioThreads = 4, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) - } } object TestZKUtils { diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl=