From 40232f6eea05636e26594a433cf0b984286582f3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 3 Apr 2015 13:36:40 -0500 Subject: [PATCH 01/24] Make metadata topics backed by concurrenthashmap --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 07f1cdb..2c356e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -14,6 +14,8 @@ package org.apache.kafka.clients; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; @@ -34,11 +36,11 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final ConcurrentMap topics; private int version; private long lastRefreshMs; private Cluster cluster; private boolean needUpdate; - private final Set topics; /** * Create a metadata instance with reasonable defaults @@ -60,7 +62,7 @@ public final class Metadata { this.version = 0; this.cluster = Cluster.empty(); this.needUpdate = false; - this.topics = new HashSet(); + this.topics = new ConcurrentHashMap(); } /** @@ -74,7 +76,7 @@ public final class Metadata { * Add the topic to maintain in the metadata */ public synchronized void add(String topic) { - topics.add(topic); + topics.put(topic, Boolean.TRUE); } /** @@ -120,7 +122,7 @@ public final class Metadata { */ public synchronized void addTopics(String... topics) { for (String topic : topics) - this.topics.add(topic); + this.topics.put(topic, Boolean.TRUE); requestUpdate(); } @@ -128,7 +130,7 @@ public final class Metadata { * Get the list of topics we are currently maintaining metadata for */ public synchronized Set topics() { - return new HashSet(this.topics); + return new HashSet(this.topics.keySet()); } /** @@ -137,7 +139,7 @@ public final class Metadata { * @return true if the topic exists, false otherwise */ public synchronized boolean containsTopic(String topic) { - return this.topics.contains(topic); + return this.topics.containsKey(topic); } /** -- 2.3.5 From 4302391dda98aad19ed19479bf66c9743857ccc5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 3 Apr 2015 14:53:37 -0500 Subject: [PATCH 02/24] Method does not need to be synchronized --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 2c356e8..fee9194 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -75,7 +75,7 @@ public final class Metadata { /** * Add the topic to maintain in the metadata */ - public synchronized void add(String topic) { + public void add(String topic) { topics.put(topic, Boolean.TRUE); } -- 2.3.5 From 630b4e8791697ec2ae491fb08fd316df59f40e01 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 14:31:35 -0500 Subject: [PATCH 03/24] Do not synchronize contains topic method --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index fee9194..c297f7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -138,7 +138,7 @@ public final class Metadata { * @param topic topic to check * @return true if the topic exists, false otherwise */ - public synchronized boolean containsTopic(String topic) { + public boolean containsTopic(String topic) { return this.topics.containsKey(topic); } -- 2.3.5 From 9c07589138598b0c8c7239724a97c734f6e1d233 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 15:44:13 -0500 Subject: [PATCH 04/24] Continue removing the need to synchronize the metadata object --- .../java/org/apache/kafka/clients/Metadata.java | 44 ++++++++++++---------- .../kafka/clients/producer/KafkaProducer.java | 32 ++++++++-------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c297f7e..b11dd5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -16,6 +16,9 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; @@ -37,10 +40,10 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; private final ConcurrentMap topics; + private final AtomicReference cluster; + private final AtomicLong lastRefreshMs; + private volatile boolean needUpdate; private int version; - private long lastRefreshMs; - private Cluster cluster; - private boolean needUpdate; /** * Create a metadata instance with reasonable defaults @@ -58,9 +61,9 @@ public final class Metadata { public Metadata(long refreshBackoffMs, long metadataExpireMs) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.lastRefreshMs = 0L; + this.lastRefreshMs = new AtomicLong(0L); this.version = 0; - this.cluster = Cluster.empty(); + this.cluster = new AtomicReference(Cluster.empty()); this.needUpdate = false; this.topics = new ConcurrentHashMap(); } @@ -68,8 +71,8 @@ public final class Metadata { /** * Get the current cluster info without blocking */ - public synchronized Cluster fetch() { - return this.cluster; + public Cluster fetch() { + return this.cluster.get(); } /** @@ -84,16 +87,17 @@ public final class Metadata { * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time * is now */ - public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); - long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; + public long timeToNextUpdate(long nowMs) { + long lastRefreshMillis = this.lastRefreshMs.get(); + long timeToExpire = needUpdate ? 0 : Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } /** * Request an update of the current cluster metadata info, return the current version before the update */ - public synchronized int requestUpdate() { + public int requestUpdate() { this.needUpdate = true; return this.version; } @@ -120,7 +124,7 @@ public final class Metadata { /** * Add one or more topics to maintain metadata for */ - public synchronized void addTopics(String... topics) { + public void addTopics(String... topics) { for (String topic : topics) this.topics.put(topic, Boolean.TRUE); requestUpdate(); @@ -129,7 +133,7 @@ public final class Metadata { /** * Get the list of topics we are currently maintaining metadata for */ - public synchronized Set topics() { + public Set topics() { return new HashSet(this.topics.keySet()); } @@ -147,9 +151,9 @@ public final class Metadata { */ public synchronized void update(Cluster cluster, long now) { this.needUpdate = false; - this.lastRefreshMs = now; + this.lastRefreshMs.set(now); this.version += 1; - this.cluster = cluster; + this.cluster.set(cluster); notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } @@ -158,22 +162,22 @@ public final class Metadata { * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. */ - public synchronized void failedUpdate(long now) { - this.lastRefreshMs = now; + public void failedUpdate(long now) { + this.lastRefreshMs.set(now); } /** * @return The current metadata version */ - public synchronized int version() { + public int version() { return this.version; } /** * The last time metadata was updated. */ - public synchronized long lastUpdate() { - return this.lastRefreshMs; + public long lastUpdate() { + return this.lastRefreshMs.get(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 42b1292..9b5d561 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -73,11 +73,11 @@ import org.slf4j.LoggerFactory; * props.put("buffer.memory", 33554432); * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - * + * * Producer producer = new KafkaProducer(props); * for(int i = 0; i < 100; i++) * producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); - * + * * producer.close(); * } *

@@ -310,7 +310,7 @@ public class KafkaProducer implements Producer { * or throw any exception that occurred while sending the record. *

* If you want to simulate a simple blocking call you can call the get() method immediately: - * + * *

      * {@code
      * byte[] key = "key".getBytes();
@@ -321,7 +321,7 @@ public class KafkaProducer implements Producer {
      * 

* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * + * *

      * {@code
      * ProducerRecord record = new ProducerRecord("the-topic", key, value);
@@ -335,10 +335,10 @@ public class KafkaProducer implements Producer {
      *               });
      * }
      * 
- * + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: - * + * *
      * {@code
      * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
@@ -350,15 +350,15 @@ public class KafkaProducer implements Producer {
      * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
-     *        
+     *
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the configured serializers
      * @throws BufferExhaustedException If block.on.buffer.full=false and the buffer is full.
-     * 
+     *
      */
     @Override
     public Future send(ProducerRecord record, Callback callback) {
@@ -420,9 +420,7 @@ public class KafkaProducer implements Producer {
         if (!this.metadata.containsTopic(topic))
             this.metadata.add(topic);
 
-        if (metadata.fetch().partitionsForTopic(topic) != null) {
-            return;
-        } else {
+        if (metadata.fetch().partitionsForTopic(topic) == null) {
             long begin = time.milliseconds();
             long remainingWaitMs = maxWaitMs;
             while (metadata.fetch().partitionsForTopic(topic) == null) {
@@ -453,7 +451,7 @@ public class KafkaProducer implements Producer {
                                               ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
-    
+
     /**
      * Invoking this method makes all buffered records immediately available to send (even if linger.ms is 
      * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
@@ -476,10 +474,10 @@ public class KafkaProducer implements Producer {
      * consumer.commit();
      * }
      * 
- * + * * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur * we need to set retries=<large_number> in our config. - * + * * @throws InterruptException If the thread is interrupted while blocked */ @Override -- 2.3.5 From c6458b89e3cff6794c9fccd6092f5356280906bb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 20:45:06 -0500 Subject: [PATCH 05/24] Store both last refresh and need to refresh in same variable --- .../java/org/apache/kafka/clients/Metadata.java | 36 +++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index b11dd5a..7d68c92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -12,19 +12,18 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.errors.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A class encapsulating some of the logic around metadata. *

@@ -42,7 +41,6 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private volatile boolean needUpdate; private int version; /** @@ -64,7 +62,6 @@ public final class Metadata { this.lastRefreshMs = new AtomicLong(0L); this.version = 0; this.cluster = new AtomicReference(Cluster.empty()); - this.needUpdate = false; this.topics = new ConcurrentHashMap(); } @@ -89,7 +86,7 @@ public final class Metadata { */ public long timeToNextUpdate(long nowMs) { long lastRefreshMillis = this.lastRefreshMs.get(); - long timeToExpire = needUpdate ? 0 : Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + long timeToExpire = Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -98,14 +95,21 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - this.needUpdate = true; + for (;;) { + long current = this.lastRefreshMs.get(); + if (current >= 0) { + if (lastRefreshMs.compareAndSet(current, -current)) { + break; + } + } + } return this.version; } /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { + public void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } @@ -149,8 +153,7 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long now) { - this.needUpdate = false; + public void update(Cluster cluster, long now) { this.lastRefreshMs.set(now); this.version += 1; this.cluster.set(cluster); @@ -177,7 +180,12 @@ public final class Metadata { * The last time metadata was updated. */ public long lastUpdate() { - return this.lastRefreshMs.get(); + long current = this.lastRefreshMs.get(); + if (current < 0) { + return -current; + } else { + return current; + } } /** -- 2.3.5 From 3b6995f1457bd1c1a11c4d9364450eb54fd35924 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 5 Apr 2015 00:22:04 -0500 Subject: [PATCH 06/24] Fix synchronize issue --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 7d68c92..510d0ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -38,6 +38,7 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final Object lock = new Object(); private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; @@ -117,7 +118,9 @@ public final class Metadata { long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { if (remainingWaitMs != 0) - wait(remainingWaitMs); + synchronized (lock) { + lock.wait(remainingWaitMs); + } long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); @@ -157,7 +160,9 @@ public final class Metadata { this.lastRefreshMs.set(now); this.version += 1; this.cluster.set(cluster); - notifyAll(); + synchronized (lock) { + lock.notifyAll(); + } log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } -- 2.3.5 From f6691b0acd921180cc49d3ccccdb9d9cfc4b308c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Apr 2015 19:17:10 -0500 Subject: [PATCH 07/24] Version needs to be volatile --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 510d0ea..444baff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -42,7 +42,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private int version; + private volatile int version; /** * Create a metadata instance with reasonable defaults -- 2.3.5 From 9a0800cf3d3012031a40f24cc4312b22d6af60a8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Apr 2015 20:01:19 -0500 Subject: [PATCH 08/24] rework how signally happens --- .../java/org/apache/kafka/clients/Metadata.java | 47 ++++++++++++++-------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 444baff..3af3001 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -21,14 +21,18 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A class encapsulating some of the logic around metadata. *

* This class is shared by the client thread (for partitioning) and the background sender thread. - * + * * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a * topic we don't have any metadata for it will trigger a metadata update. */ @@ -38,7 +42,8 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; @@ -96,12 +101,14 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - for (;;) { + for (; ; ) { long current = this.lastRefreshMs.get(); - if (current >= 0) { + if (current > 0) { if (lastRefreshMs.compareAndSet(current, -current)) { break; } + } else { + break; } } return this.version; @@ -117,14 +124,17 @@ public final class Metadata { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - if (remainingWaitMs != 0) - synchronized (lock) { - lock.wait(remainingWaitMs); + if (remainingWaitMs > 0) { + try { + lock.lock(); + condition.await(remainingWaitMs, TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) + } else { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; + } + remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); } } @@ -158,14 +168,17 @@ public final class Metadata { */ public void update(Cluster cluster, long now) { this.lastRefreshMs.set(now); - this.version += 1; this.cluster.set(cluster); - synchronized (lock) { - lock.notifyAll(); + this.version += 1; + try { + lock.lock(); + condition.signalAll(); + } finally { + lock.unlock(); } log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } - + /** * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. @@ -173,7 +186,7 @@ public final class Metadata { public void failedUpdate(long now) { this.lastRefreshMs.set(now); } - + /** * @return The current metadata version */ -- 2.3.5 From 64c9dd3f530916ea1340df224d244c822f334537 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 7 Apr 2015 15:12:25 -0500 Subject: [PATCH 09/24] remove unnecessary creation of new set --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 3af3001..162bd6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -17,7 +17,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -151,7 +150,7 @@ public final class Metadata { * Get the list of topics we are currently maintaining metadata for */ public Set topics() { - return new HashSet(this.topics.keySet()); + return this.topics.keySet(); } /** -- 2.3.5 From 5770eb891d04a9dea945af9d8d16043703176a80 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 7 Apr 2015 16:04:08 -0500 Subject: [PATCH 10/24] initialize 0 at the field level --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 162bd6d..f528c2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -46,7 +46,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private volatile int version; + private volatile int version = 0; /** * Create a metadata instance with reasonable defaults @@ -65,7 +65,6 @@ public final class Metadata { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = new AtomicLong(0L); - this.version = 0; this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); } -- 2.3.5 From 5841cfaee4b94db61c57eee222dba82dfbf38aa8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 8 Apr 2015 00:18:11 -0500 Subject: [PATCH 11/24] Fix the build --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 13 ++++++++++--- .../test/java/org/apache/kafka/clients/MetadataTest.java | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index f528c2f..829b9df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -64,7 +64,7 @@ public final class Metadata { public Metadata(long refreshBackoffMs, long metadataExpireMs) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.lastRefreshMs = new AtomicLong(0L); + this.lastRefreshMs = new AtomicLong(1L); this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); } @@ -90,7 +90,13 @@ public final class Metadata { */ public long timeToNextUpdate(long nowMs) { long lastRefreshMillis = this.lastRefreshMs.get(); - long timeToExpire = Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + long timeToExpire; + if (lastRefreshMillis < 0) { + timeToExpire = 0; + lastRefreshMillis = -lastRefreshMillis; + } else { + timeToExpire = Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + } long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -99,7 +105,7 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - for (; ; ) { + for (;;) { long current = this.lastRefreshMs.get(); if (current > 0) { if (lastRefreshMs.compareAndSet(current, -current)) { @@ -109,6 +115,7 @@ public final class Metadata { break; } } + return this.version; } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 928087d..f0bbb77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -36,7 +36,7 @@ public class MetadataTest { @Test public void testMetadata() throws Exception { - long time = 0; + long time = 1; metadata.update(Cluster.empty(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); metadata.requestUpdate(); -- 2.3.5 From 29bdf7626f332b7d60d2d61be73952b29619af96 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Apr 2015 11:58:24 -0500 Subject: [PATCH 12/24] Start moving synchronization of metadata to different class --- .../org/apache/kafka/clients/MetadataThing.java | 27 ++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 clients/src/main/java/org/apache/kafka/clients/MetadataThing.java diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java new file mode 100644 index 0000000..4e0773e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java @@ -0,0 +1,27 @@ +package org.apache.kafka.clients; + +/** + * Created by timbrooks on 4/9/15. + */ +public class MetadataThing { + + public void awaitUpdate() { + + } + + public long timeToNextUpdate() { + return 0L; + } + + public int requestUpdate() { + return 0; + } + + public long lastUpdateAttempt() { + return 0L; + } + + public long lastSuccessfulUpdate() { + return 0L; + } +} -- 2.3.5 From 967a9ef8e9470ad650719066c48d3769d9479eb2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Apr 2015 18:16:37 -0500 Subject: [PATCH 13/24] Start moving synchronization work to new class --- .../java/org/apache/kafka/clients/Metadata.java | 57 ++++--------------- .../org/apache/kafka/clients/MetadataThing.java | 66 +++++++++++++++++++--- .../org/apache/kafka/clients/NetworkClient.java | 21 +++---- 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 829b9df..e825d5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -46,6 +46,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; + private final MetadataThing thing; private volatile int version = 0; /** @@ -67,6 +68,7 @@ public final class Metadata { this.lastRefreshMs = new AtomicLong(1L); this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); + this.thing = new MetadataThing(metadataExpireMs, refreshBackoffMs); } /** @@ -89,32 +91,14 @@ public final class Metadata { * is now */ public long timeToNextUpdate(long nowMs) { - long lastRefreshMillis = this.lastRefreshMs.get(); - long timeToExpire; - if (lastRefreshMillis < 0) { - timeToExpire = 0; - lastRefreshMillis = -lastRefreshMillis; - } else { - timeToExpire = Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); - } - long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; - return Math.max(timeToExpire, timeToAllowUpdate); + return thing.timeToNextUpdate(nowMs); } /** * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - for (;;) { - long current = this.lastRefreshMs.get(); - if (current > 0) { - if (lastRefreshMs.compareAndSet(current, -current)) { - break; - } - } else { - break; - } - } + thing.requestUpdate(); return this.version; } @@ -130,12 +114,7 @@ public final class Metadata { long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { if (remainingWaitMs > 0) { - try { - lock.lock(); - condition.await(remainingWaitMs, TimeUnit.MILLISECONDS); - } finally { - lock.unlock(); - } + thing.awaitUpdate(remainingWaitMs); } else { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } @@ -172,15 +151,11 @@ public final class Metadata { * Update the cluster metadata */ public void update(Cluster cluster, long now) { - this.lastRefreshMs.set(now); this.cluster.set(cluster); this.version += 1; - try { - lock.lock(); - condition.signalAll(); - } finally { - lock.unlock(); - } + // Race for clients right here + + thing.updateComplete(now, true); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } @@ -189,7 +164,7 @@ public final class Metadata { * to avoid retrying immediately. */ public void failedUpdate(long now) { - this.lastRefreshMs.set(now); + thing.updateComplete(now, false); } /** @@ -203,18 +178,6 @@ public final class Metadata { * The last time metadata was updated. */ public long lastUpdate() { - long current = this.lastRefreshMs.get(); - if (current < 0) { - return -current; - } else { - return current; - } - } - - /** - * The metadata refresh backoff in ms - */ - public long refreshBackoff() { - return refreshBackoffMs; + return thing.lastUpdateAttempt(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java index 4e0773e..6326bd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java @@ -1,16 +1,66 @@ -package org.apache.kafka.clients; - /** - * Created by timbrooks on 4/9/15. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ +package org.apache.kafka.clients; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + public class MetadataThing { - public void awaitUpdate() { + private final long metadataExpireMs; + private final long refreshBackoffMs; + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final AtomicLong lastRefreshTime = new AtomicLong(0L); + private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); + public MetadataThing(long metadataExpireMs, long refreshBackoffMs) { + this.metadataExpireMs = metadataExpireMs; + this.refreshBackoffMs = refreshBackoffMs; } - public long timeToNextUpdate() { - return 0L; + public void awaitUpdate(final long maxWaitMs) throws InterruptedException { + try { + lock.lock(); + condition.await(maxWaitMs, TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); + } + } + + public void updateComplete(long time, boolean success) { + lastRefreshTime.set(time); + if (success) { + lastSuccessfulUpdateTime.set(time); + + try { + lock.lock(); + condition.signalAll(); + } finally { + lock.unlock(); + } + } + } + + public long timeToNextUpdate(long now) { + long lastRefreshTime = this.lastRefreshTime.get(); + long timeUntilNextUpdate = lastRefreshTime + metadataExpireMs - now; + long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; + + return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); } public int requestUpdate() { @@ -18,10 +68,10 @@ public class MetadataThing { } public long lastUpdateAttempt() { - return 0L; + return lastRefreshTime.get(); } public long lastSuccessfulUpdate() { - return 0L; + return lastSuccessfulUpdateTime.get(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b7ae595..8bbac47 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -76,9 +76,6 @@ public class NetworkClient implements KafkaClient { /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; - /* the last timestamp when no broker node is available to connect */ - private long lastNoNodeAvailableMs; - public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -96,7 +93,6 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; - this.lastNoNodeAvailableMs = 0; } /** @@ -209,13 +205,13 @@ public class NetworkClient implements KafkaClient { public List poll(long timeout, long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; - // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), - waitForMetadataFetch); - if (metadataTimeout == 0) + + long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); + if (metadataTimeout == 0) { maybeUpdateMetadata(now); + } + // do the I/O try { this.selector.poll(Math.min(timeout, metadataTimeout)); @@ -462,8 +458,9 @@ public class NetworkClient implements KafkaClient { Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); - // mark the timestamp for no node available to connect - this.lastNoNodeAvailableMs = now; + + // mark the timestamp for for failed update + metadata.failedUpdate(now); return; } @@ -485,7 +482,7 @@ public class NetworkClient implements KafkaClient { } else { // connected, but can't send more OR connecting // In either case, we just need to wait for a network event to let us know the selected // connection might be usable again. - this.lastNoNodeAvailableMs = now; + this.metadata.failedUpdate(now); } } -- 2.3.5 From 59ec6560443be6d14be26be92f41f575e1e9eb7f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 11 Apr 2015 11:06:29 -0500 Subject: [PATCH 14/24] Remove unused code --- .../java/org/apache/kafka/clients/Metadata.java | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index e825d5f..d56865e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -20,18 +20,13 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * A class encapsulating some of the logic around metadata. - *

+ *

* This class is shared by the client thread (for partitioning) and the background sender thread. - * + *

* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a * topic we don't have any metadata for it will trigger a metadata update. */ @@ -39,13 +34,8 @@ public final class Metadata { private static final Logger log = LoggerFactory.getLogger(Metadata.class); - private final long refreshBackoffMs; - private final long metadataExpireMs; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); private final ConcurrentMap topics; private final AtomicReference cluster; - private final AtomicLong lastRefreshMs; private final MetadataThing thing; private volatile int version = 0; @@ -58,14 +48,12 @@ public final class Metadata { /** * Create a new Metadata instance + * * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy - * polling + * polling * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh */ public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this.refreshBackoffMs = refreshBackoffMs; - this.metadataExpireMs = metadataExpireMs; - this.lastRefreshMs = new AtomicLong(1L); this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); this.thing = new MetadataThing(metadataExpireMs, refreshBackoffMs); @@ -140,6 +128,7 @@ public final class Metadata { /** * Check if a topic is already in the topic set. + * * @param topic topic to check * @return true if the topic exists, false otherwise */ -- 2.3.5 From ed4fa29e294ae8f9e7aab6b1d379fdf849e17809 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 11 Apr 2015 13:14:14 -0500 Subject: [PATCH 15/24] Functionality works. Not threadsafe --- clients/src/main/java/org/apache/kafka/clients/MetadataThing.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java index 6326bd6..df12df0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java @@ -26,6 +26,8 @@ public class MetadataThing { private final Condition condition = lock.newCondition(); private final AtomicLong lastRefreshTime = new AtomicLong(0L); private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); + private boolean updatedRequested = false; + private int version = 0; public MetadataThing(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; @@ -44,6 +46,7 @@ public class MetadataThing { public void updateComplete(long time, boolean success) { lastRefreshTime.set(time); if (success) { + version += 1; lastSuccessfulUpdateTime.set(time); try { @@ -57,14 +60,15 @@ public class MetadataThing { public long timeToNextUpdate(long now) { long lastRefreshTime = this.lastRefreshTime.get(); - long timeUntilNextUpdate = lastRefreshTime + metadataExpireMs - now; + long timeUntilNextUpdate = updatedRequested ? 0 : lastRefreshTime + metadataExpireMs - now; long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); } public int requestUpdate() { - return 0; + updatedRequested = true; + return version; } public long lastUpdateAttempt() { -- 2.3.5 From 60785ef3c459ef12edcd93bfd8e60573ff643499 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 11 Apr 2015 20:08:14 -0500 Subject: [PATCH 16/24] move version into metadata synchronizer --- .../java/org/apache/kafka/clients/Metadata.java | 24 ++++++------------- .../org/apache/kafka/clients/MetadataThing.java | 27 +++++++++++++++++----- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index d56865e..60597ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -37,7 +37,6 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final MetadataThing thing; - private volatile int version = 0; /** * Create a metadata instance with reasonable defaults @@ -86,9 +85,7 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - thing.requestUpdate(); - - return this.version; + return thing.requestUpdate(); } /** @@ -98,15 +95,10 @@ public final class Metadata { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - while (this.version <= lastVersion) { - if (remainingWaitMs > 0) { - thing.awaitUpdate(remainingWaitMs); - } else { - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - } - remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); + + boolean success = thing.awaitUpdate(lastVersion, maxWaitMs); + if (!success) { + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } } @@ -141,11 +133,9 @@ public final class Metadata { */ public void update(Cluster cluster, long now) { this.cluster.set(cluster); - this.version += 1; - // Race for clients right here thing.updateComplete(now, true); - log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); + log.debug("Updated cluster metadata version {} to {}", this.thing.version(), this.cluster); } /** @@ -160,7 +150,7 @@ public final class Metadata { * @return The current metadata version */ public int version() { - return this.version; + return this.thing.version(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java index df12df0..7cf1df9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java @@ -34,13 +34,24 @@ public class MetadataThing { this.refreshBackoffMs = refreshBackoffMs; } - public void awaitUpdate(final long maxWaitMs) throws InterruptedException { - try { - lock.lock(); - condition.await(maxWaitMs, TimeUnit.MILLISECONDS); - } finally { - lock.unlock(); + public boolean awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + + while (this.version <= lastVersion) { + if (remainingWaitMs > 0) { + try { + lock.lock(); + condition.await(maxWaitMs, TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); + } + } else { + return false; + } + remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); } + return true; } public void updateComplete(long time, boolean success) { @@ -78,4 +89,8 @@ public class MetadataThing { public long lastSuccessfulUpdate() { return lastSuccessfulUpdateTime.get(); } + + public int version() { + return version; + } } -- 2.3.5 From da1d107b351ad0e96ce3bb2eeb36a4c936962fb2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 12 Apr 2015 10:32:06 -0500 Subject: [PATCH 17/24] Make version volatile --- .../java/org/apache/kafka/clients/MetadataThing.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java index 7cf1df9..4a9c976 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -26,8 +27,8 @@ public class MetadataThing { private final Condition condition = lock.newCondition(); private final AtomicLong lastRefreshTime = new AtomicLong(0L); private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); - private boolean updatedRequested = false; - private int version = 0; + private boolean updateRequested = false; + private final AtomicInteger version = new AtomicInteger(0); public MetadataThing(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; @@ -38,7 +39,7 @@ public class MetadataThing { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - while (this.version <= lastVersion) { + while (this.version.get() <= lastVersion) { if (remainingWaitMs > 0) { try { lock.lock(); @@ -57,8 +58,8 @@ public class MetadataThing { public void updateComplete(long time, boolean success) { lastRefreshTime.set(time); if (success) { - version += 1; - lastSuccessfulUpdateTime.set(time); + version.getAndIncrement(); + lastSuccessfulUpdateTime.lazySet(time); try { lock.lock(); @@ -71,15 +72,15 @@ public class MetadataThing { public long timeToNextUpdate(long now) { long lastRefreshTime = this.lastRefreshTime.get(); - long timeUntilNextUpdate = updatedRequested ? 0 : lastRefreshTime + metadataExpireMs - now; + long timeUntilNextUpdate = updateRequested ? 0 : lastRefreshTime + metadataExpireMs - now; long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); } public int requestUpdate() { - updatedRequested = true; - return version; + updateRequested = true; + return version.get(); } public long lastUpdateAttempt() { @@ -91,6 +92,6 @@ public class MetadataThing { } public int version() { - return version; + return version.get(); } } -- 2.3.5 From 4925d0292f92da4d9602d51647b9e3d3824d9178 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 12 Apr 2015 10:56:35 -0500 Subject: [PATCH 18/24] Rename classes --- .../java/org/apache/kafka/clients/Metadata.java | 20 ++--- .../apache/kafka/clients/MetadataBookkeeper.java | 99 ++++++++++++++++++++++ .../org/apache/kafka/clients/MetadataThing.java | 97 --------------------- 3 files changed, 109 insertions(+), 107 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/MetadataThing.java diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 60597ea..fa02779 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -36,7 +36,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; - private final MetadataThing thing; + private final MetadataBookkeeper bookkeeper; /** * Create a metadata instance with reasonable defaults @@ -55,7 +55,7 @@ public final class Metadata { public Metadata(long refreshBackoffMs, long metadataExpireMs) { this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); - this.thing = new MetadataThing(metadataExpireMs, refreshBackoffMs); + this.bookkeeper = new MetadataBookkeeper(metadataExpireMs, refreshBackoffMs); } /** @@ -78,14 +78,14 @@ public final class Metadata { * is now */ public long timeToNextUpdate(long nowMs) { - return thing.timeToNextUpdate(nowMs); + return bookkeeper.timeToNextUpdate(nowMs); } /** * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - return thing.requestUpdate(); + return bookkeeper.requestUpdate(); } /** @@ -96,7 +96,7 @@ public final class Metadata { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } - boolean success = thing.awaitUpdate(lastVersion, maxWaitMs); + boolean success = bookkeeper.awaitUpdate(lastVersion, maxWaitMs); if (!success) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } @@ -134,8 +134,8 @@ public final class Metadata { public void update(Cluster cluster, long now) { this.cluster.set(cluster); - thing.updateComplete(now, true); - log.debug("Updated cluster metadata version {} to {}", this.thing.version(), this.cluster); + bookkeeper.updateComplete(now, true); + log.debug("Updated cluster metadata version {} to {}", this.bookkeeper.version(), this.cluster); } /** @@ -143,20 +143,20 @@ public final class Metadata { * to avoid retrying immediately. */ public void failedUpdate(long now) { - thing.updateComplete(now, false); + bookkeeper.updateComplete(now, false); } /** * @return The current metadata version */ public int version() { - return this.thing.version(); + return this.bookkeeper.version(); } /** * The last time metadata was updated. */ public long lastUpdate() { - return thing.lastUpdateAttempt(); + return bookkeeper.lastUpdateAttempt(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java new file mode 100644 index 0000000..df76127 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class MetadataBookkeeper { + + private final long metadataExpireMs; + private final long refreshBackoffMs; + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final AtomicLong lastRefreshTime = new AtomicLong(0L); + private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); + private AtomicBoolean updateRequested = new AtomicBoolean(false); + private final AtomicInteger version = new AtomicInteger(0); + + public MetadataBookkeeper(long metadataExpireMs, long refreshBackoffMs) { + this.metadataExpireMs = metadataExpireMs; + this.refreshBackoffMs = refreshBackoffMs; + } + + public boolean awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + + while (this.version.get() <= lastVersion) { + if (remainingWaitMs > 0) { + try { + lock.lock(); + condition.await(maxWaitMs, TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); + } + } else { + return false; + } + remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); + } + return true; + } + + public void updateComplete(long time, boolean success) { + lastRefreshTime.set(time); + if (success) { + updateRequested.lazySet(false); + lastSuccessfulUpdateTime.lazySet(time); + version.getAndIncrement(); + + try { + lock.lock(); + condition.signalAll(); + } finally { + lock.unlock(); + } + } + } + + public long timeToNextUpdate(long now) { + long lastRefreshTime = this.lastRefreshTime.get(); + long timeUntilNextUpdate = updateRequested.get() ? 0 : lastRefreshTime + metadataExpireMs - now; + long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; + + return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); + } + + public int requestUpdate() { + updateRequested.set(true); + return version.get(); + } + + public long lastUpdateAttempt() { + return lastRefreshTime.get(); + } + + public long lastSuccessfulUpdate() { + return lastSuccessfulUpdateTime.get(); + } + + public int version() { + return version.get(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java b/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java deleted file mode 100644 index 4a9c976..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataThing.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class MetadataThing { - - private final long metadataExpireMs; - private final long refreshBackoffMs; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private final AtomicLong lastRefreshTime = new AtomicLong(0L); - private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); - private boolean updateRequested = false; - private final AtomicInteger version = new AtomicInteger(0); - - public MetadataThing(long metadataExpireMs, long refreshBackoffMs) { - this.metadataExpireMs = metadataExpireMs; - this.refreshBackoffMs = refreshBackoffMs; - } - - public boolean awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - - while (this.version.get() <= lastVersion) { - if (remainingWaitMs > 0) { - try { - lock.lock(); - condition.await(maxWaitMs, TimeUnit.MILLISECONDS); - } finally { - lock.unlock(); - } - } else { - return false; - } - remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); - } - return true; - } - - public void updateComplete(long time, boolean success) { - lastRefreshTime.set(time); - if (success) { - version.getAndIncrement(); - lastSuccessfulUpdateTime.lazySet(time); - - try { - lock.lock(); - condition.signalAll(); - } finally { - lock.unlock(); - } - } - } - - public long timeToNextUpdate(long now) { - long lastRefreshTime = this.lastRefreshTime.get(); - long timeUntilNextUpdate = updateRequested ? 0 : lastRefreshTime + metadataExpireMs - now; - long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; - - return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); - } - - public int requestUpdate() { - updateRequested = true; - return version.get(); - } - - public long lastUpdateAttempt() { - return lastRefreshTime.get(); - } - - public long lastSuccessfulUpdate() { - return lastSuccessfulUpdateTime.get(); - } - - public int version() { - return version.get(); - } -} -- 2.3.5 From 82c70ac0947586288f576bd6c1a498e56a277727 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 12 Apr 2015 11:04:49 -0500 Subject: [PATCH 19/24] move to finergrained locking --- .../apache/kafka/clients/MetadataBookkeeper.java | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java index df76127..eaa3e14 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java @@ -28,8 +28,8 @@ public class MetadataBookkeeper { private final Condition condition = lock.newCondition(); private final AtomicLong lastRefreshTime = new AtomicLong(0L); private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); - private AtomicBoolean updateRequested = new AtomicBoolean(false); - private final AtomicInteger version = new AtomicInteger(0); + private boolean updateRequested = false; + private int version = 0; public MetadataBookkeeper(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; @@ -40,7 +40,7 @@ public class MetadataBookkeeper { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - while (this.version.get() <= lastVersion) { + while (this.version <= lastVersion) { if (remainingWaitMs > 0) { try { lock.lock(); @@ -59,12 +59,12 @@ public class MetadataBookkeeper { public void updateComplete(long time, boolean success) { lastRefreshTime.set(time); if (success) { - updateRequested.lazySet(false); - lastSuccessfulUpdateTime.lazySet(time); - version.getAndIncrement(); try { lock.lock(); + updateRequested = false; + ++version; + lastSuccessfulUpdateTime.lazySet(time); condition.signalAll(); } finally { lock.unlock(); @@ -74,15 +74,20 @@ public class MetadataBookkeeper { public long timeToNextUpdate(long now) { long lastRefreshTime = this.lastRefreshTime.get(); - long timeUntilNextUpdate = updateRequested.get() ? 0 : lastRefreshTime + metadataExpireMs - now; + long timeUntilNextUpdate = updateRequested ? 0 : lastRefreshTime + metadataExpireMs - now; long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); } public int requestUpdate() { - updateRequested.set(true); - return version.get(); + try { + lock.lock(); + updateRequested = true; + return this.version; + } finally { + lock.unlock(); + } } public long lastUpdateAttempt() { @@ -94,6 +99,6 @@ public class MetadataBookkeeper { } public int version() { - return version.get(); + return version; } } -- 2.3.5 From 990f5e9003cfcb2fc56b358544300c372586d8cd Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 12 Apr 2015 11:13:29 -0500 Subject: [PATCH 20/24] Use locks in bookkeeper --- .../apache/kafka/clients/MetadataBookkeeper.java | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java index eaa3e14..78a10f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java @@ -13,8 +13,6 @@ package org.apache.kafka.clients; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -29,7 +27,7 @@ public class MetadataBookkeeper { private final AtomicLong lastRefreshTime = new AtomicLong(0L); private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); private boolean updateRequested = false; - private int version = 0; + private volatile int version = 0; public MetadataBookkeeper(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; @@ -40,19 +38,22 @@ public class MetadataBookkeeper { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - while (this.version <= lastVersion) { - if (remainingWaitMs > 0) { - try { - lock.lock(); + try { + lock.lock(); + + while (this.version <= lastVersion) { + if (remainingWaitMs > 0) { condition.await(maxWaitMs, TimeUnit.MILLISECONDS); - } finally { - lock.unlock(); + } else { + return false; } - } else { - return false; + remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); } - remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); + + } finally { + lock.unlock(); } + return true; } -- 2.3.5 From a5168a531c235ddd0e82c69e825394319bec443f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 12 Apr 2015 11:18:18 -0500 Subject: [PATCH 21/24] Only use atomic variabled --- .../org/apache/kafka/clients/MetadataBookkeeper.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java index 78a10f2..7dfb85c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java @@ -13,6 +13,8 @@ package org.apache.kafka.clients; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -26,8 +28,8 @@ public class MetadataBookkeeper { private final Condition condition = lock.newCondition(); private final AtomicLong lastRefreshTime = new AtomicLong(0L); private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); - private boolean updateRequested = false; - private volatile int version = 0; + private final AtomicBoolean updateRequested = new AtomicBoolean(false); + private final AtomicInteger version = new AtomicInteger(0); public MetadataBookkeeper(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; @@ -41,7 +43,7 @@ public class MetadataBookkeeper { try { lock.lock(); - while (this.version <= lastVersion) { + while (this.version.get() <= lastVersion) { if (remainingWaitMs > 0) { condition.await(maxWaitMs, TimeUnit.MILLISECONDS); } else { @@ -63,8 +65,8 @@ public class MetadataBookkeeper { try { lock.lock(); - updateRequested = false; - ++version; + updateRequested.lazySet(false); + version.lazySet(version.get() + 1); lastSuccessfulUpdateTime.lazySet(time); condition.signalAll(); } finally { @@ -75,7 +77,7 @@ public class MetadataBookkeeper { public long timeToNextUpdate(long now) { long lastRefreshTime = this.lastRefreshTime.get(); - long timeUntilNextUpdate = updateRequested ? 0 : lastRefreshTime + metadataExpireMs - now; + long timeUntilNextUpdate = updateRequested.get() ? 0 : lastRefreshTime + metadataExpireMs - now; long backoffTimeRemaining = lastRefreshTime + refreshBackoffMs - now; return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); @@ -84,8 +86,8 @@ public class MetadataBookkeeper { public int requestUpdate() { try { lock.lock(); - updateRequested = true; - return this.version; + updateRequested.lazySet(true); + return this.version.get(); } finally { lock.unlock(); } @@ -100,6 +102,6 @@ public class MetadataBookkeeper { } public int version() { - return version; + return version.get(); } } -- 2.3.5 From dd15016261f61bc50843159626ad9f8601bf5413 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 15 Apr 2015 19:09:43 -0500 Subject: [PATCH 22/24] use successful metadata in metrics --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index fa02779..dd2308b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -157,6 +157,6 @@ public final class Metadata { * The last time metadata was updated. */ public long lastUpdate() { - return bookkeeper.lastUpdateAttempt(); + return bookkeeper.lastSuccessfulUpdate(); } } -- 2.3.5 From d3ea0527dea8f40965e87466f3cfe6004f0b465c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 15 Apr 2015 19:19:39 -0500 Subject: [PATCH 23/24] Change these things back to trunk --- .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 4 +++- clients/src/test/java/org/apache/kafka/clients/MetadataTest.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9b5d561..63595cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -420,7 +420,9 @@ public class KafkaProducer implements Producer { if (!this.metadata.containsTopic(topic)) this.metadata.add(topic); - if (metadata.fetch().partitionsForTopic(topic) == null) { + if (metadata.fetch().partitionsForTopic(topic) != null) { + return; + } else { long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; while (metadata.fetch().partitionsForTopic(topic) == null) { diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index f0bbb77..928087d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -36,7 +36,7 @@ public class MetadataTest { @Test public void testMetadata() throws Exception { - long time = 1; + long time = 0; metadata.update(Cluster.empty(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); metadata.requestUpdate(); -- 2.3.5 From 862e5ceb5baf9e2c931a42e394735f0ea420f226 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 26 Apr 2015 17:12:24 -0700 Subject: [PATCH 24/24] Address issues with patch --- .../java/org/apache/kafka/clients/Metadata.java | 12 ++- .../apache/kafka/clients/MetadataBookkeeper.java | 85 ++++++++++++++-------- .../kafka/clients/producer/internals/Sender.java | 2 +- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index dd2308b..ec1cd09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -73,23 +73,21 @@ public final class Metadata { } /** - * The next time to update the cluster info is the maximum of the time the current info will expire and the time the - * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time - * is now + * The amount of time until when the next metadata update should be performed. */ public long timeToNextUpdate(long nowMs) { return bookkeeper.timeToNextUpdate(nowMs); } /** - * Request an update of the current cluster metadata info, return the current version before the update + * Request an update of the current cluster metadata info, return the current version before the update. */ public int requestUpdate() { return bookkeeper.requestUpdate(); } /** - * Wait for metadata update until the current version is larger than the last version we know of + * Wait for the next metadata update. */ public void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { @@ -154,9 +152,9 @@ public final class Metadata { } /** - * The last time metadata was updated. + * The last time metadata was updated successfully. */ - public long lastUpdate() { + public long lastSuccessfulUpdate() { return bookkeeper.lastSuccessfulUpdate(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java index 7dfb85c..3a9b61b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java @@ -3,78 +3,87 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - *

+ *

* http://www.apache.org/licenses/LICENSE-2.0 - *

+ *

* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +/** + * A class tracking internal information about the Metadata class. This class tracks the current Metadata version, + * when the last update occurred, when the last update was attempted, and when the next update should be attempted. + *

+ * Version updates and requests for updates must be synchronized. + *

+ */ public class MetadataBookkeeper { private final long metadataExpireMs; private final long refreshBackoffMs; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); private final AtomicLong lastRefreshTime = new AtomicLong(0L); - private final AtomicLong lastSuccessfulUpdateTime = new AtomicLong(0L); private final AtomicBoolean updateRequested = new AtomicBoolean(false); - private final AtomicInteger version = new AtomicInteger(0); + private final AtomicInteger version = new AtomicInteger(0); + private volatile long lastSuccessfulUpdateTime = 0L; public MetadataBookkeeper(long metadataExpireMs, long refreshBackoffMs) { this.metadataExpireMs = metadataExpireMs; this.refreshBackoffMs = refreshBackoffMs; } + /** + * Wait for metadata update until the current version is larger than the last version we know of. + */ public boolean awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - try { - lock.lock(); - + // Must be synchronized to ensure that if the version number is not newer than the lastVersion, the thread + // is waiting to receive a notify when a call to updateComplete occurs. + synchronized (this) { while (this.version.get() <= lastVersion) { if (remainingWaitMs > 0) { - condition.await(maxWaitMs, TimeUnit.MILLISECONDS); + wait(maxWaitMs); } else { return false; } remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); } - } finally { - lock.unlock(); } - return true; } + /** + * Update bookkeeping information to reflect that a metadata update has been attempted. If the update was + * successful, increment the version number and erase requested updates. + */ public void updateComplete(long time, boolean success) { lastRefreshTime.set(time); if (success) { - - try { - lock.lock(); - updateRequested.lazySet(false); - version.lazySet(version.get() + 1); - lastSuccessfulUpdateTime.lazySet(time); - condition.signalAll(); - } finally { - lock.unlock(); + // Must be synchronized to avoid scenario where update is immediately requested, but the old version + // number is returned by the requestUpdate method. This would lead the thread to not wait in awaitUpdate + // and get stale information because it believes that an update has already been completed. + synchronized (this) { + updateRequested.set(false); + version.getAndIncrement(); + lastSuccessfulUpdateTime = time; + notifyAll(); } } } + /** + * The next time to update the cluster info is the maximum of the time the current info will expire and the time the + * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time + * is now + */ public long timeToNextUpdate(long now) { long lastRefreshTime = this.lastRefreshTime.get(); long timeUntilNextUpdate = updateRequested.get() ? 0 : lastRefreshTime + metadataExpireMs - now; @@ -83,24 +92,36 @@ public class MetadataBookkeeper { return Math.max(Math.max(timeUntilNextUpdate, backoffTimeRemaining), 0); } + /** + * Request an update of the metadata, return the current version + */ public int requestUpdate() { - try { - lock.lock(); - updateRequested.lazySet(true); + // Must be synchronized to avoid a scenario where a requested update is immediately erased due to an update + // being completed. If the requested update is erased, the requesting thread will have to wait longer than + // expected for the version number to be bumped. + synchronized (this) { + updateRequested.set(true); return this.version.get(); - } finally { - lock.unlock(); } } + /** + * The last time a metadata update was attempted. + */ public long lastUpdateAttempt() { return lastRefreshTime.get(); } + /** + * The last time metadata was updated successfully. + */ public long lastSuccessfulUpdate() { - return lastSuccessfulUpdateTime.get(); + return lastSuccessfulUpdateTime; } + /** + * @return The current metadata version + */ public int version() { return version.get(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b2db91c..01dd9ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -395,7 +395,7 @@ public class Sender implements Runnable { m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags); metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { - return (now - metadata.lastUpdate()) / 1000.0; + return (now - metadata.lastSuccessfulUpdate()) / 1000.0; } }); } -- 2.3.5