From 17e27e14c65fe49eb97faff25bd9549f2b38fc3a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 3 Apr 2015 13:36:40 -0500 Subject: [PATCH 01/10] Make metadata topics backed by concurrenthashmap --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 07f1cdb..2c356e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -14,6 +14,8 @@ package org.apache.kafka.clients; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; @@ -34,11 +36,11 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final ConcurrentMap topics; private int version; private long lastRefreshMs; private Cluster cluster; private boolean needUpdate; - private final Set topics; /** * Create a metadata instance with reasonable defaults @@ -60,7 +62,7 @@ public final class Metadata { this.version = 0; this.cluster = Cluster.empty(); this.needUpdate = false; - this.topics = new HashSet(); + this.topics = new ConcurrentHashMap(); } /** @@ -74,7 +76,7 @@ public final class Metadata { * Add the topic to maintain in the metadata */ public synchronized void add(String topic) { - topics.add(topic); + topics.put(topic, Boolean.TRUE); } /** @@ -120,7 +122,7 @@ public final class Metadata { */ public synchronized void addTopics(String... topics) { for (String topic : topics) - this.topics.add(topic); + this.topics.put(topic, Boolean.TRUE); requestUpdate(); } @@ -128,7 +130,7 @@ public final class Metadata { * Get the list of topics we are currently maintaining metadata for */ public synchronized Set topics() { - return new HashSet(this.topics); + return new HashSet(this.topics.keySet()); } /** @@ -137,7 +139,7 @@ public final class Metadata { * @return true if the topic exists, false otherwise */ public synchronized boolean containsTopic(String topic) { - return this.topics.contains(topic); + return this.topics.containsKey(topic); } /** -- 2.3.5 From 6d066777d3098bbc3406ae75c58d5f6712d18e82 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 3 Apr 2015 14:53:37 -0500 Subject: [PATCH 02/10] Method does not need to be synchronized --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 2c356e8..fee9194 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -75,7 +75,7 @@ public final class Metadata { /** * Add the topic to maintain in the metadata */ - public synchronized void add(String topic) { + public void add(String topic) { topics.put(topic, Boolean.TRUE); } -- 2.3.5 From de1624a2d7811580ed89a789d8fa67dabfeff568 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 14:31:35 -0500 Subject: [PATCH 03/10] Do not synchronize contains topic method --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index fee9194..c297f7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -138,7 +138,7 @@ public final class Metadata { * @param topic topic to check * @return true if the topic exists, false otherwise */ - public synchronized boolean containsTopic(String topic) { + public boolean containsTopic(String topic) { return this.topics.containsKey(topic); } -- 2.3.5 From d0d2c79626f4f85342dfaafce7c0f583f0419dfa Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 15:44:13 -0500 Subject: [PATCH 04/10] Continue removing the need to synchronize the metadata object --- .../java/org/apache/kafka/clients/Metadata.java | 44 ++++++++++++---------- .../kafka/clients/producer/KafkaProducer.java | 32 ++++++++-------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c297f7e..b11dd5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -16,6 +16,9 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; @@ -37,10 +40,10 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; private final ConcurrentMap topics; + private final AtomicReference cluster; + private final AtomicLong lastRefreshMs; + private volatile boolean needUpdate; private int version; - private long lastRefreshMs; - private Cluster cluster; - private boolean needUpdate; /** * Create a metadata instance with reasonable defaults @@ -58,9 +61,9 @@ public final class Metadata { public Metadata(long refreshBackoffMs, long metadataExpireMs) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.lastRefreshMs = 0L; + this.lastRefreshMs = new AtomicLong(0L); this.version = 0; - this.cluster = Cluster.empty(); + this.cluster = new AtomicReference(Cluster.empty()); this.needUpdate = false; this.topics = new ConcurrentHashMap(); } @@ -68,8 +71,8 @@ public final class Metadata { /** * Get the current cluster info without blocking */ - public synchronized Cluster fetch() { - return this.cluster; + public Cluster fetch() { + return this.cluster.get(); } /** @@ -84,16 +87,17 @@ public final class Metadata { * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time * is now */ - public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); - long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; + public long timeToNextUpdate(long nowMs) { + long lastRefreshMillis = this.lastRefreshMs.get(); + long timeToExpire = needUpdate ? 0 : Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } /** * Request an update of the current cluster metadata info, return the current version before the update */ - public synchronized int requestUpdate() { + public int requestUpdate() { this.needUpdate = true; return this.version; } @@ -120,7 +124,7 @@ public final class Metadata { /** * Add one or more topics to maintain metadata for */ - public synchronized void addTopics(String... topics) { + public void addTopics(String... topics) { for (String topic : topics) this.topics.put(topic, Boolean.TRUE); requestUpdate(); @@ -129,7 +133,7 @@ public final class Metadata { /** * Get the list of topics we are currently maintaining metadata for */ - public synchronized Set topics() { + public Set topics() { return new HashSet(this.topics.keySet()); } @@ -147,9 +151,9 @@ public final class Metadata { */ public synchronized void update(Cluster cluster, long now) { this.needUpdate = false; - this.lastRefreshMs = now; + this.lastRefreshMs.set(now); this.version += 1; - this.cluster = cluster; + this.cluster.set(cluster); notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } @@ -158,22 +162,22 @@ public final class Metadata { * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. */ - public synchronized void failedUpdate(long now) { - this.lastRefreshMs = now; + public void failedUpdate(long now) { + this.lastRefreshMs.set(now); } /** * @return The current metadata version */ - public synchronized int version() { + public int version() { return this.version; } /** * The last time metadata was updated. */ - public synchronized long lastUpdate() { - return this.lastRefreshMs; + public long lastUpdate() { + return this.lastRefreshMs.get(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b91e2c5..f3217a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -72,11 +72,11 @@ import org.slf4j.LoggerFactory; * props.put("buffer.memory", 33554432); * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - * + * * Producer producer = new KafkaProducer(props); * for(int i = 0; i < 100; i++) * producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); - * + * * producer.close(); * } *

@@ -301,7 +301,7 @@ public class KafkaProducer implements Producer { * or throw any exception that occurred while sending the record. *

* If you want to simulate a simple blocking call you can call the get() method immediately: - * + * *

      * {@code
      * byte[] key = "key".getBytes();
@@ -312,7 +312,7 @@ public class KafkaProducer implements Producer {
      * 

* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * + * *

      * {@code
      * ProducerRecord record = new ProducerRecord("the-topic", key, value);
@@ -326,10 +326,10 @@ public class KafkaProducer implements Producer {
      *               });
      * }
      * 
- * + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: - * + * *
      * {@code
      * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
@@ -341,15 +341,15 @@ public class KafkaProducer implements Producer {
      * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
-     *        
+     *
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the configured serializers
      * @throws BufferExhaustedException If block.on.buffer.full=false and the buffer is full.
-     * 
+     *
      */
     @Override
     public Future send(ProducerRecord record, Callback callback) {
@@ -411,9 +411,7 @@ public class KafkaProducer implements Producer {
         if (!this.metadata.containsTopic(topic))
             this.metadata.add(topic);
 
-        if (metadata.fetch().partitionsForTopic(topic) != null) {
-            return;
-        } else {
+        if (metadata.fetch().partitionsForTopic(topic) == null) {
             long begin = time.milliseconds();
             long remainingWaitMs = maxWaitMs;
             while (metadata.fetch().partitionsForTopic(topic) == null) {
@@ -444,7 +442,7 @@ public class KafkaProducer implements Producer {
                                               ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
-    
+
     /**
      * Invoking this method makes all buffered records immediately available to send (even if linger.ms is 
      * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
@@ -467,10 +465,10 @@ public class KafkaProducer implements Producer {
      * consumer.commit();
      * }
      * 
- * + * * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur * we need to set retries=<large_number> in our config. - * + * * @throws InterruptException If the thread is interrupted while blocked */ @Override -- 2.3.5 From 67f1dab7214feeeed6cfded5600f8f80879e7cb1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sat, 4 Apr 2015 20:45:06 -0500 Subject: [PATCH 05/10] Store both last refresh and need to refresh in same variable --- .../java/org/apache/kafka/clients/Metadata.java | 36 +++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index b11dd5a..7d68c92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -12,19 +12,18 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.errors.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A class encapsulating some of the logic around metadata. *

@@ -42,7 +41,6 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private volatile boolean needUpdate; private int version; /** @@ -64,7 +62,6 @@ public final class Metadata { this.lastRefreshMs = new AtomicLong(0L); this.version = 0; this.cluster = new AtomicReference(Cluster.empty()); - this.needUpdate = false; this.topics = new ConcurrentHashMap(); } @@ -89,7 +86,7 @@ public final class Metadata { */ public long timeToNextUpdate(long nowMs) { long lastRefreshMillis = this.lastRefreshMs.get(); - long timeToExpire = needUpdate ? 0 : Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); + long timeToExpire = Math.max(lastRefreshMillis + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = lastRefreshMillis + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -98,14 +95,21 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - this.needUpdate = true; + for (;;) { + long current = this.lastRefreshMs.get(); + if (current >= 0) { + if (lastRefreshMs.compareAndSet(current, -current)) { + break; + } + } + } return this.version; } /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { + public void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } @@ -149,8 +153,7 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long now) { - this.needUpdate = false; + public void update(Cluster cluster, long now) { this.lastRefreshMs.set(now); this.version += 1; this.cluster.set(cluster); @@ -177,7 +180,12 @@ public final class Metadata { * The last time metadata was updated. */ public long lastUpdate() { - return this.lastRefreshMs.get(); + long current = this.lastRefreshMs.get(); + if (current < 0) { + return -current; + } else { + return current; + } } /** -- 2.3.5 From b44680d3faf74c523e50945d8e74e2cf2149d6c2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 5 Apr 2015 00:22:04 -0500 Subject: [PATCH 06/10] Fix synchronize issue --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 7d68c92..510d0ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -38,6 +38,7 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final Object lock = new Object(); private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; @@ -117,7 +118,9 @@ public final class Metadata { long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { if (remainingWaitMs != 0) - wait(remainingWaitMs); + synchronized (lock) { + lock.wait(remainingWaitMs); + } long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); @@ -157,7 +160,9 @@ public final class Metadata { this.lastRefreshMs.set(now); this.version += 1; this.cluster.set(cluster); - notifyAll(); + synchronized (lock) { + lock.notifyAll(); + } log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } -- 2.3.5 From d29d1cef50a64d1fcae2de35ff0440630a7b38e6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Apr 2015 19:17:10 -0500 Subject: [PATCH 07/10] Version needs to be volatile --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 510d0ea..444baff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -42,7 +42,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private int version; + private volatile int version; /** * Create a metadata instance with reasonable defaults -- 2.3.5 From 9415dcd7af8956fa4f91382c9adeee66524cbd4d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Apr 2015 20:01:19 -0500 Subject: [PATCH 08/10] rework how signally happens --- .../java/org/apache/kafka/clients/Metadata.java | 47 ++++++++++++++-------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 444baff..3af3001 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -21,14 +21,18 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A class encapsulating some of the logic around metadata. *

* This class is shared by the client thread (for partitioning) and the background sender thread. - * + * * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a * topic we don't have any metadata for it will trigger a metadata update. */ @@ -38,7 +42,8 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; - private final Object lock = new Object(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; @@ -96,12 +101,14 @@ public final class Metadata { * Request an update of the current cluster metadata info, return the current version before the update */ public int requestUpdate() { - for (;;) { + for (; ; ) { long current = this.lastRefreshMs.get(); - if (current >= 0) { + if (current > 0) { if (lastRefreshMs.compareAndSet(current, -current)) { break; } + } else { + break; } } return this.version; @@ -117,14 +124,17 @@ public final class Metadata { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - if (remainingWaitMs != 0) - synchronized (lock) { - lock.wait(remainingWaitMs); + if (remainingWaitMs > 0) { + try { + lock.lock(); + condition.await(remainingWaitMs, TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) + } else { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; + } + remainingWaitMs = maxWaitMs - (System.currentTimeMillis() - begin); } } @@ -158,14 +168,17 @@ public final class Metadata { */ public void update(Cluster cluster, long now) { this.lastRefreshMs.set(now); - this.version += 1; this.cluster.set(cluster); - synchronized (lock) { - lock.notifyAll(); + this.version += 1; + try { + lock.lock(); + condition.signalAll(); + } finally { + lock.unlock(); } log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } - + /** * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. @@ -173,7 +186,7 @@ public final class Metadata { public void failedUpdate(long now) { this.lastRefreshMs.set(now); } - + /** * @return The current metadata version */ -- 2.3.5 From afccceca9e05564ed856d08021edbfe701fd946f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 7 Apr 2015 15:12:25 -0500 Subject: [PATCH 09/10] remove unnecessary creation of new set --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 3af3001..162bd6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -17,7 +17,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -151,7 +150,7 @@ public final class Metadata { * Get the list of topics we are currently maintaining metadata for */ public Set topics() { - return new HashSet(this.topics.keySet()); + return this.topics.keySet(); } /** -- 2.3.5 From 9f018ee2c01705969e892541cc9aff68d1478172 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 7 Apr 2015 16:04:08 -0500 Subject: [PATCH 10/10] initialize 0 at the field level --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 162bd6d..f528c2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -46,7 +46,7 @@ public final class Metadata { private final ConcurrentMap topics; private final AtomicReference cluster; private final AtomicLong lastRefreshMs; - private volatile int version; + private volatile int version = 0; /** * Create a metadata instance with reasonable defaults @@ -65,7 +65,6 @@ public final class Metadata { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = new AtomicLong(0L); - this.version = 0; this.cluster = new AtomicReference(Cluster.empty()); this.topics = new ConcurrentHashMap(); } -- 2.3.5