diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d8f9ce6..d21f922 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -122,21 +122,16 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean isReady(Node node, long now) { - int nodeId = node.id(); + return isReady(node.id(), now); + } + + private boolean isReady(int node, long now) { if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first priority return false; else // otherwise we are ready if we are connected and can send more requests - return isSendable(nodeId); - } - - /** - * Are we connected and ready and able to send more requests to the given node? - * @param node The node - */ - private boolean isSendable(int node) { - return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); + return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); } /** @@ -151,21 +146,21 @@ public class NetworkClient implements KafkaClient { public List poll(List requests, long timeout, long now) { List sends = new ArrayList(); + // should we update our metadata? + long metadataTimeout = metadata.timeToNextUpdate(now); + if (!this.metadataFetchInProgress && metadataTimeout == 0) + maybeUpdateMetadata(sends, now); + for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); int nodeId = request.request().destination(); - if (!isSendable(nodeId)) + if (!isReady(nodeId, now)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); this.inFlightRequests.add(request); sends.add(request.request()); } - // should we update our metadata? - long metadataTimeout = metadata.timeToNextUpdate(now); - if (!this.metadataFetchInProgress && metadataTimeout == 0) - maybeUpdateMetadata(sends, now); - // do the I/O try { this.selector.poll(Math.min(timeout, metadataTimeout), sends); @@ -314,7 +309,7 @@ public class NetworkClient implements KafkaClient { } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead if (this.selector.disconnected().size() > 0) - this.metadata.requestUpdate(); + this.metadata.forceUpdate(); } /** @@ -352,12 +347,9 @@ public class NetworkClient implements KafkaClient { */ private void maybeUpdateMetadata(List sends, long now) { Node node = this.leastLoadedNode(now); - if (node == null) { - log.debug("Give up sending metadata request since no node is available"); + if (node == null) return; - } - log.debug("Trying to send metadata request to node {}", node.id()); if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; @@ -367,7 +359,6 @@ public class NetworkClient implements KafkaClient { this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one - log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id()); initiateConnect(node, now); } } @@ -384,7 +375,7 @@ public class NetworkClient implements KafkaClient { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ - metadata.requestUpdate(); + metadata.forceUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java index f026ae4..05eb6ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.TopicPartition; * every rebalance operation. This callback will execute in the user thread as part of the * {@link Consumer#poll(long) poll(long)} API on every rebalance attempt. * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the - * {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned()} callback. And will commit offsets synchronously - * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked()} + * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously + * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()} * callback. */ public interface ConsumerRebalanceCallback { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f58b850..d85ca30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -74,7 +73,6 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; - private final Time time; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -96,7 +94,7 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); - this.time = new SystemTime(); + Time time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -121,7 +119,7 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + this.metadata.update(Cluster.bootstrap(addresses), 0); NetworkClient client = new NetworkClient(new Selector(this.metrics, time), this.metadata, @@ -227,9 +225,8 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { - // first make sure the metadata for the topic is available - waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, metadata.fetch()); + Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); + int partition = partitioner.partition(record, cluster); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); @@ -259,31 +256,6 @@ public class KafkaProducer implements Producer { } /** - * Wait for cluster metadata including partitions for the given topic to be available. - * @param topic The topic we want metadata for - * @param maxWaitMs The maximum time in ms for waiting on the metadata - */ - private void waitOnMetadata(String topic, long maxWaitMs) { - if (metadata.fetch().partitionsForTopic(topic) != null) { - return; - } else { - long begin = time.milliseconds(); - long remainingWaitMs = maxWaitMs; - while (metadata.fetch().partitionsForTopic(topic) == null) { - log.trace("Requesting metadata update for topic {}.", topic); - int version = metadata.requestUpdate(); - metadata.add(topic); - sender.wakeup(); - metadata.awaitUpdate(version, remainingWaitMs); - long elapsed = time.milliseconds() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } - } - } - - /** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { @@ -299,10 +271,8 @@ public class KafkaProducer implements Producer { " configuration."); } - @Override public List partitionsFor(String topic) { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); - return this.metadata.fetch().partitionsForTopic(topic); + return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 4aa5b01..8890aa2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -13,9 +13,11 @@ package org.apache.kafka.clients.producer.internals; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,10 +36,9 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; - private int version; private long lastRefreshMs; private Cluster cluster; - private boolean needUpdate; + private boolean forceUpdate; private final Set topics; /** @@ -57,9 +58,8 @@ public final class Metadata { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; - this.version = 0; this.cluster = Cluster.empty(); - this.needUpdate = false; + this.forceUpdate = false; this.topics = new HashSet(); } @@ -71,10 +71,33 @@ public final class Metadata { } /** - * Add the topic to maintain in the metadata + * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic, + * block waiting for an update. + * @param topic The topic we want metadata for + * @param maxWaitMs The maximum amount of time to block waiting for metadata */ - public synchronized void add(String topic) { - topics.add(topic); + public synchronized Cluster fetch(String topic, long maxWaitMs) { + List partitions = null; + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + do { + partitions = cluster.partitionsForTopic(topic); + if (partitions == null) { + topics.add(topic); + forceUpdate = true; + try { + log.trace("Requesting metadata update for topic {}.", topic); + wait(remainingWaitMs); + } catch (InterruptedException e) { /* this is fine, just try again */ + } + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; + } else { + return cluster; + } + } while (true); } /** @@ -83,35 +106,16 @@ public final class Metadata { * been request then the expiry time is now */ public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } /** - * Request an update of the current cluster metadata info, return the current version before the update + * Force an update of the current cluster info */ - public synchronized int requestUpdate() { - this.needUpdate = true; - return this.version; - } - - /** - * Wait for metadata update until the current version is larger than the last version we know of - */ - public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) { - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - while (this.version <= lastVerison) { - try { - wait(remainingWaitMs); - } catch (InterruptedException e) { /* this is fine */ - } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } + public synchronized void forceUpdate() { + this.forceUpdate = true; } /** @@ -125,12 +129,11 @@ public final class Metadata { * Update the cluster metadata */ public synchronized void update(Cluster cluster, long now) { - this.needUpdate = false; + this.forceUpdate = false; this.lastRefreshMs = now; - this.version += 1; this.cluster = cluster; notifyAll(); - log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); + log.debug("Updated cluster metadata to {}", cluster); } /** @@ -139,4 +142,5 @@ public final class Metadata { public synchronized long lastUpdate() { return this.lastRefreshMs; } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index a016269..37b9d1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -147,7 +147,7 @@ public class Sender implements Runnable { // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) - this.metadata.requestUpdate(); + this.metadata.forceUpdate(); // remove any nodes we aren't ready to send to Iterator iter = result.readyNodes.iterator(); @@ -252,7 +252,7 @@ public class Sender implements Runnable { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) - metadata.requestUpdate(); + metadata.forceUpdate(); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b68bbf0..cef75d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -60,23 +60,23 @@ public interface Selectable { public void poll(long timeout, List sends) throws IOException; /** - * The list of sends that completed on the last {@link #poll(long, List) poll()} call. + * The list of sends that completed on the last {@link #poll(long, List) poll()} call. */ public List completedSends(); /** - * The list of receives that completed on the last {@link #poll(long, List) poll()} call. + * The list of receives that completed on the last {@link #poll(long, List) poll()} call. */ public List completedReceives(); /** - * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} + * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} * call. */ public List disconnected(); /** - * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} + * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} * call. */ public List connected(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 543304c..0d7d04c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -31,7 +31,7 @@ public class MetadataTest { long time = 0; metadata.update(Cluster.empty(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - metadata.requestUpdate(); + metadata.forceUpdate(); assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); time += refreshBackoffMs; assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); @@ -40,9 +40,7 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - // keep updating the metadata until no need to - while (metadata.timeToNextUpdate(time) == 0) - metadata.update(TestUtils.singletonCluster(topic, 1), time); + metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); @@ -53,8 +51,7 @@ public class MetadataTest { private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - while (metadata.fetch().partitionsForTopic(topic) == null) - metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE); + metadata.fetch(topic, Integer.MAX_VALUE); } }; thread.start(); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java index 71eb80f..3514ec7 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java @@ -115,11 +115,11 @@ public class Props extends Properties { } /** - * build props from a list of strings and interpret them as + * build props from a list of strings and interprate them as * key, value, key, value,.... * * @param args - * @return props + * @return */ @SuppressWarnings("unchecked") public static Props of(String... args) { @@ -148,7 +148,7 @@ public class Props extends Properties { /** * get property of "key" and split the value by " ," * @param key - * @return list of values + * @return */ public List getStringList(String key) { return getStringList(key, "\\s*,\\s*"); @@ -158,7 +158,7 @@ public class Props extends Properties { * get property of "key" and split the value by "sep" * @param key * @param sep - * @return string list of values + * @return */ public List getStringList(String key, String sep) { String val = super.getProperty(key); @@ -176,7 +176,7 @@ public class Props extends Properties { * get string list with default value. default delimiter is "," * @param key * @param defaultValue - * @return string list of values + * @return */ public List getStringList(String key, List defaultValue) { if (containsKey(key)) @@ -189,7 +189,7 @@ public class Props extends Properties { * get string list with default value * @param key * @param defaultValue - * @return string list of values + * @return */ public List getStringList(String key, List defaultValue, String sep) { @@ -251,10 +251,10 @@ public class Props extends Properties { } /** - * get boolean value with default value + * get boolean value * @param key * @param defaultValue - * @return boolean value + * @return * @throws Exception if value is not of type boolean or string */ public Boolean getBoolean(String key, Boolean defaultValue) @@ -265,7 +265,8 @@ public class Props extends Properties { /** * get boolean value * @param key - * @return boolean value + * @param defaultValue + * @return * @throws Exception if value is not of type boolean or string or * if value doesn't exist */ @@ -274,10 +275,10 @@ public class Props extends Properties { } /** - * get long value with default value - * @param name + * get long value + * @param key * @param defaultValue - * @return long value + * @return * @throws Exception if value is not of type long or string */ public Long getLong(String name, Long defaultValue) @@ -287,8 +288,9 @@ public class Props extends Properties { /** * get long value - * @param name - * @return long value + * @param key + * @param defaultValue + * @return * @throws Exception if value is not of type long or string or * if value doesn't exist */ @@ -297,10 +299,10 @@ public class Props extends Properties { } /** - * get integer value with default value - * @param name + * get integer value + * @param key * @param defaultValue - * @return integer value + * @return * @throws Exception if value is not of type integer or string */ public Integer getInt(String name, Integer defaultValue) @@ -310,8 +312,9 @@ public class Props extends Properties { /** * get integer value - * @param name - * @return integer value + * @param key + * @param defaultValue + * @return * @throws Exception if value is not of type integer or string or * if value doesn't exist */ @@ -320,10 +323,10 @@ public class Props extends Properties { } /** - * get double value with default value - * @param name + * get double value + * @param key * @param defaultValue - * @return double value + * @return * @throws Exception if value is not of type double or string */ public Double getDouble(String name, double defaultValue) @@ -333,8 +336,9 @@ public class Props extends Properties { /** * get double value - * @param name - * @return double value + * @param key + * @param defaultValue + * @return * @throws Exception if value is not of type double or string or * if value doesn't exist */ @@ -343,10 +347,10 @@ public class Props extends Properties { } /** - * get URI value with default value - * @param name + * get URI value + * @param key * @param defaultValue - * @return URI value + * @return * @throws Exception if value is not of type URI or string */ public URI getUri(String name, URI defaultValue) throws Exception { @@ -355,9 +359,9 @@ public class Props extends Properties { /** * get URI value - * @param name + * @param key * @param defaultValue - * @return URI value + * @return * @throws Exception if value is not of type URI or string */ public URI getUri(String name, String defaultValue) @@ -368,8 +372,9 @@ public class Props extends Properties { /** * get URI value - * @param name - * @return URI value + * @param key + * @param defaultValue + * @return * @throws Exception if value is not of type URI or string or * if value doesn't exist */ @@ -380,7 +385,7 @@ public class Props extends Properties { /** * compare two props * @param p - * @return true or false + * @return */ public boolean equalsProps(Props p) { if (p == null) { @@ -427,7 +432,7 @@ public class Props extends Properties { /** * get all property names - * @return set of property names + * @return */ public Set getKeySet() { return super.stringPropertyNames(); @@ -448,7 +453,7 @@ public class Props extends Properties { /** * clone a Props * @param p - * @return props + * @return */ public static Props clone(Props p) { return new Props(p); diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java index cc3400f..44d3d35 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java @@ -29,8 +29,7 @@ public interface ConsumerConnector { * Create a list of MessageStreams of type T for each topic. * * @param topicCountMap a map of (topic, #streams) pair - * @param keyDecoder a decoder that decodes the message key - * @param valueDecoder a decoder that decodes the message itself + * @param decoder a decoder that converts from Message to T * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 1c4c7bd..8a62dfa 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -346,7 +346,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** * Do a basic sanity check on this index to detect obvious problems - * @throws IllegalArgumentException if any problems are found + * @throw IllegalArgumentException if any problems are found */ def sanityCheck() { require(entries == 0 || lastOffset > baseOffset, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bb2e654..ef75b67 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,29 +35,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - - if(props.containsKey("log.retention.ms")){ - props.getIntInRange("log.retention.ms", (1, Int.MaxValue)) - } - else if(props.containsKey("log.retention.minutes")){ + if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) - } - else { + } else { millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) } - } - private def getLogRollTimeMillis(): Long = { - val millisInHour = 60L * 60L * 1000L - - if(props.containsKey("log.roll.ms")){ - props.getIntInRange("log.roll.ms", (1, Int.MaxValue)) - } - else { - millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) - } } - + /*********** General Configuration ***********/ /* the broker id for this server */ @@ -120,7 +105,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum time before a new log segment is rolled out */ - val logRollTimeMillis = getLogRollTimeMillis + val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it */ val logRetentionTimeMillis = getLogRetentionTimeMillis diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5a56f57..c22e51e 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -275,7 +275,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, - segmentMs = config.logRollTimeMillis, + segmentMs = 60L * 60L * 1000L * config.logRollHours, flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, retentionSize = config.logRetentionBytes, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6a56a77..554f7c3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -36,9 +36,9 @@ object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -46,7 +46,6 @@ class ReplicaManager(val config: KafkaConfig, @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] - private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) @@ -61,7 +60,7 @@ class ReplicaManager(val config: KafkaConfig, new Gauge[Int] { def value = { leaderPartitionsLock synchronized { - leaderPartitions.size + getLeaderPartitions().size } } } @@ -83,7 +82,7 @@ class ReplicaManager(val config: KafkaConfig, def underReplicatedPartitionCount(): Int = { leaderPartitionsLock synchronized { - leaderPartitions.count(_.isUnderReplicated) + getLeaderPartitions().count(_.isUnderReplicated) } } @@ -117,9 +116,6 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = ErrorMapping.NoError getPartition(topic, partitionId) match { case Some(partition) => - leaderPartitionsLock synchronized { - leaderPartitions -= partition - } if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) if (removedPartition != null) @@ -331,10 +327,6 @@ class ReplicaManager(val config: KafkaConfig, partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} - // Finally add these partitions to the list of partitions for which the leader is the current broker - leaderPartitionsLock synchronized { - leaderPartitions ++= partitionState.keySet - } } catch { case e: Throwable => partitionState.foreach { state => @@ -383,9 +375,6 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { - leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet - } var partitionsToMakeFollower: Set[Partition] = Set() @@ -466,7 +455,7 @@ class ReplicaManager(val config: KafkaConfig, trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") var curLeaderPartitions: List[Partition] = null leaderPartitionsLock synchronized { - curLeaderPartitions = leaderPartitions.toList + curLeaderPartitions = getLeaderPartitions() } curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } @@ -480,6 +469,9 @@ class ReplicaManager(val config: KafkaConfig, } } + private def getLeaderPartitions() : List[Partition] = { + allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList + } /** * Flushes the highwatermark value for all partitions to the highwatermark file */ diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 555d751..4f06e34 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -116,30 +116,30 @@ object MirrorMaker extends Logging { val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() - // create consumer streams - connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) - .map(new ZookeeperConsumerConnector(_)) - val numConsumers = connectors.size * numStreams - - // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) - - // create producer threads val useNewProducer = options.has(useNewProducerOpt) val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + + // create producer threads val clientId = producerProps.getProperty("client.id", "") - producerThreads = (0 until numProducers).map(i => { + val producers = (1 to numProducers).map(i => { producerProps.setProperty("client.id", clientId + "-" + i) - val producer = if (useNewProducer) new NewShinyProducer(producerProps) else new OldProducer(producerProps) - new ProducerThread(mirrorDataChannel, producer, i) }) - // create consumer threads + // create consumer streams + connectors = options.valuesOf(consumerConfigOpt).toList + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) + .map(new ZookeeperConsumerConnector(_)) + val numConsumers = connectors.size * numStreams + + // create a data channel btw the consumers and the producers + val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) + + producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2)) + val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) else @@ -153,7 +153,7 @@ object MirrorMaker extends Logging { fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } - consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2)) + consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2)) assert(consumerThreads.size == numConsumers) Runtime.getRuntime.addShutdownHook(new Thread() { @@ -233,6 +233,7 @@ object MirrorMaker extends Logging { class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], mirrorDataChannel: DataChannel, + producers: Seq[BaseProducer], threadId: Int) extends Thread with Logging with KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 63d3dda..b29981b 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -494,7 +494,7 @@ object Utils extends Logging { /** * Create a file with the given path * @param path The path to create - * @throws KafkaStorageException If the file create fails + * @throw KafkaStorageException If the file create fails * @return The created file */ def createFile(path: String): File = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2377abe..6f4809d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -45,16 +45,6 @@ class KafkaConfigTest extends JUnit3Suite { } @Test - def testLogRetentionTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.retention.ms", "1800000") - - val cfg = new KafkaConfig(props) - assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) - - } - - @Test def testLogRetentionTimeNoConfigProvided() { val props = TestUtils.createBrokerConfig(0, 8181) @@ -73,17 +63,6 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - - @Test - def testLogRetentionTimeBothMinutesAndMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.retention.ms", "1800000") - props.put("log.retention.minutes", "10") - - val cfg = new KafkaConfig(props) - assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) - - } @Test def testAdvertiseDefaults() { @@ -150,36 +129,4 @@ class KafkaConfigTest extends JUnit3Suite { new KafkaConfig(props) } } - - @Test - def testLogRollTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.roll.ms", "1800000") - - val cfg = new KafkaConfig(props) - assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis) - - } - - @Test - def testLogRollTimeBothMsAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.roll.ms", "1800000") - props.put("log.roll.hours", "1") - - val cfg = new KafkaConfig(props) - assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis) - - } - - @Test - def testLogRollTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - - val cfg = new KafkaConfig(props) - assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis ) - - } - - } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3faa884..57b2bd5 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -84,13 +84,6 @@ object TestUtils extends Logging { val f = new File(IoTmpDir, "kafka-" + random.nextInt(1000000)) f.mkdirs() f.deleteOnExit() - - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { - Utils.rm(f) - } - }) - f }