From 4a66a7748b504db18a95a0ec6abd0ae927a282d8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 31 May 2015 11:34:42 -0500 Subject: [PATCH] Create distinction between successful update and unsuccessfull update for metrics and backoff --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 11 +++++++---- .../apache/kafka/clients/producer/internals/Sender.java | 2 +- .../test/java/org/apache/kafka/clients/MetadataTest.java | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 07f1cdb..0387f26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -36,6 +36,7 @@ public final class Metadata { private final long metadataExpireMs; private int version; private long lastRefreshMs; + private long lastSuccessfulRefreshMs; private Cluster cluster; private boolean needUpdate; private final Set topics; @@ -57,6 +58,7 @@ public final class Metadata { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; + this.lastSuccessfulRefreshMs = 0L; this.version = 0; this.cluster = Cluster.empty(); this.needUpdate = false; @@ -83,7 +85,7 @@ public final class Metadata { * is now */ public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -146,6 +148,7 @@ public final class Metadata { public synchronized void update(Cluster cluster, long now) { this.needUpdate = false; this.lastRefreshMs = now; + this.lastSuccessfulRefreshMs = now; this.version += 1; this.cluster = cluster; notifyAll(); @@ -168,10 +171,10 @@ public final class Metadata { } /** - * The last time metadata was updated. + * The last time metadata was successfully updated. */ - public synchronized long lastUpdate() { - return this.lastRefreshMs; + public synchronized long lastSuccessfulUpdate() { + return this.lastSuccessfulRefreshMs; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1e943d6..b9fbc90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -410,7 +410,7 @@ public class Sender implements Runnable { m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags); metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { - return (now - metadata.lastUpdate()) / 1000.0; + return (now - metadata.lastSuccessfulUpdate()) / 1000.0; } }); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 928087d..249d6b8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -85,6 +85,20 @@ public class MetadataTest { } } + @Test + public void testFailedUpdate() { + long time = 100; + metadata.update(Cluster.empty(), time); + + assertEquals(100, metadata.timeToNextUpdate(1000)); + metadata.failedUpdate(1100); + + assertEquals(100, metadata.timeToNextUpdate(1100)); + assertEquals(100, metadata.lastSuccessfulUpdate()); + + } + + private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { -- 2.4.0