From 87c91e068dc6112ada98282d1918620f8ea94d97 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 11 May 2015 14:50:19 -0700 Subject: [PATCH 1/5] Fixing bug --- .../kafka/clients/consumer/internals/Fetcher.java | 15 ++++++- .../kafka/clients/producer/internals/Sender.java | 15 +++++++ .../org/apache/kafka/common/protocol/Protocol.java | 34 +++++++++++++--- .../apache/kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 8 +++- .../kafka/common/requests/ProduceRequest.java | 4 +- .../kafka/common/requests/ProduceResponse.java | 8 +++- .../clients/consumer/internals/FetcherTest.java | 46 ++++++++++++++++++---- .../clients/producer/internals/SenderTest.java | 37 +++++++++++++++-- .../kafka/common/requests/RequestResponseTest.java | 4 +- core/src/main/scala/kafka/api/FetchRequest.scala | 9 ++++- core/src/main/scala/kafka/api/FetchResponse.scala | 26 +++++++++--- .../src/main/scala/kafka/api/ProducerRequest.scala | 2 +- .../main/scala/kafka/api/ProducerResponse.scala | 17 ++++++-- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../scala/kafka/server/AbstractFetcherThread.scala | 16 ++++++-- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 ++- .../main/scala/kafka/server/DelayedProduce.scala | 5 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++-- .../main/scala/kafka/server/OffsetManager.scala | 2 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 13 +++--- .../api/RequestResponseSerializationTest.scala | 40 ++++++++++++++++++- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- 25 files changed, 277 insertions(+), 61 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 56281ee..68479df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,7 @@ public class Fetcher { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; private final KafkaClient client; @@ -338,6 +338,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } /** @@ -391,6 +392,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; + public final Sensor quotaDelayTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -440,6 +442,17 @@ public class Fetcher { this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); + + this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.metricGrpName, + "The average throttle time in ms", + tags), new Avg()); + + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 07e65d4..d4f751f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -253,6 +254,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + this.sensors.recordQuotaDelay(response.request().request().destination(), + response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -352,6 +355,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; + public final Sensor quotaDelayTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -381,6 +385,12 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); + this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Avg()); + m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Max()); + this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); this.recordsPerRequestSensor.add(m, new Rate()); @@ -515,6 +525,11 @@ public class Sender implements Runnable { nodeRequestTime.record(latency, now); } } + + public void recordQuotaDelay(int node, long delayTimeMs) { + this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 3dc8b01..461388e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,9 +107,23 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - - public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; - public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; + + public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16), + new Field("base_offset", + INT64))))))), + new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + + public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; + public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -342,6 +356,9 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0; public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -357,9 +374,14 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - - public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0}; - public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0}; + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, + "Amount of time in milliseconds the request was throttled if at all", + 0), + new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; + public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; /* Consumer metadata api */ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 8686d83..aaec66d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -130,7 +130,7 @@ public class FetchRequest extends AbstractRequest { responseData.put(entry.getKey(), partitionResponse); } - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } public int replicaId() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index eb8951f..005ec08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,6 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -59,6 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; + private final int delayTime; public static final class PartitionData { public final short errorCode; @@ -72,8 +74,9 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData) { + public FetchResponse(Map responseData, int delayTime) { super(new Struct(CURRENT_SCHEMA)); + this.delayTime = delayTime; Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -94,11 +97,13 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responseData = responseData; } public FetchResponse(Struct struct) { super(struct); + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -118,6 +123,7 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } + public int getDelayTime() { return this.delayTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index fabeae3..ea69c6f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -27,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequest { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -102,7 +102,7 @@ public class ProduceRequest extends AbstractRequest { responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); } - return new ProduceResponse(responseMap); + return new ProduceResponse(responseMap, 0); } public short acks() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 37ec0b7..46fd87d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,6 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -49,8 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; + private final int delayTime; - public ProduceResponse(Map responses) { + public ProduceResponse(Map responses, int delayTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -70,7 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responses = responses; + this.delayTime = delayTime; } public ProduceResponse(Struct struct) { @@ -88,11 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); } public Map responses() { return this.responses; } + public int getDelayTime() { return this.delayTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4195410..faefe0c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,8 +22,10 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -102,7 +104,7 @@ public class FetcherTest { // normal fetch fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); client.poll(0, time.milliseconds()); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); @@ -124,14 +126,14 @@ public class FetcherTest { // fetch with not leader fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); // fetch with unknown topic partition fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -139,7 +141,7 @@ public class FetcherTest { // fetch with out of range subscriptions.fetched(tp, 5); fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -156,7 +158,7 @@ public class FetcherTest { // fetch with out of range fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -164,8 +166,37 @@ public class FetcherTest { assertEquals(0L, (long) subscriptions.consumed(tp)); } - private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + subscriptions.subscribe(tp); + subscriptions.fetched(tp, 0); + subscriptions.consumed(tp, 0); + for (int i = 1; i <= 3; i++) { + // normal fetch + fetcher.initFetches(cluster, time.milliseconds()); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); + client.poll(0, time.milliseconds()); + } + + Map allMetrics = metrics.metrics(); + long avg = 0; + long max = 0; + for (MetricName m : allMetrics.keySet()) { + if (m.name().equals("throttle-time-avg")) { + avg = (long) allMetrics.get(m).value(); + } else if (m.name().equals("throttle-time-max")) { + max = (long) allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + + private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int delayTime) { + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), delayTime); return response.toStruct(); } @@ -173,5 +204,4 @@ public class FetcherTest { ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets))); return response.toStruct(); } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..2cc9e50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,7 +26,9 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -76,7 +78,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -84,6 +86,33 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + final long offset = 0; + for (int i = 1; i <= 3; i++) { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); + sender.run(time.milliseconds()); + } + long avg = 0; + long max = 0; + + Map allMetrics = metrics.metrics(); + for (MetricName m : allMetrics.keySet()) { + if (m.name().equals("throttle-time-avg")) { + avg = (long) allMetrics.get(m).value(); + } else if (m.name().equals("throttle-time-max")) { + max = (long) allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + @Test public void testRetries() throws Exception { // create a sender with retries = 1 @@ -110,7 +139,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); long offset = 0; - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -138,10 +167,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp); + ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e3cc196..7b1c9ea 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -103,7 +103,7 @@ public class RequestResponseTest { private AbstractRequestResponse createFetchResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } private AbstractRequest createHeartBeatRequest() { @@ -179,6 +179,6 @@ public class RequestResponseTest { private AbstractRequestResponse createProduceResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); - return new ProduceResponse(responseData); + return new ProduceResponse(responseData, 0); } } diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 5b38f85..36e288f 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -31,7 +31,7 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 val DefaultCorrelationId = 0 @@ -170,7 +170,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @nonthreadsafe class FetchRequestBuilder() { private val correlationId = new AtomicInteger(0) - private val versionId = FetchRequest.CurrentVersion + private var versionId = FetchRequest.CurrentVersion private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -205,6 +205,11 @@ class FetchRequestBuilder() { this } + def requestVersion(versionId: Short): FetchRequestBuilder = { + this.versionId = versionId + this + } + def build() = { val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) requestMap.clear() diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 0b6b33a..7881a65 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -147,8 +147,12 @@ object FetchResponse { 4 + /* correlationId */ 4 /* topic count */ - def readFrom(buffer: ByteBuffer): FetchResponse = { + + // The request version is used to determine which fields we can expect in the response + def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt + val delayTime = if (requestVersion > 0) buffer.getInt else 0 + val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) @@ -157,20 +161,23 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*)) + FetchResponse(correlationId, Map(pairs:_*), requestVersion, delayTime) } } -case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) +case class FetchResponse(correlationId: Int, + data: Map[TopicAndPartition, FetchResponsePartitionData], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - + val delayTimeSize = if(requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + + FetchResponse.headerSize + delayTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -220,10 +227,17 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) + // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) buffer.putInt(payloadSize) buffer.putInt(fetchResponse.correlationId) + // Include the delayTime only if the client can read it + if(fetchResponse.requestVersion > 0) { + buffer.putInt(fetchResponse.delayTime) + } + buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count + buffer.rewind() private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index c866180..7fb143e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -26,7 +26,7 @@ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response object ProducerRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5d1fac4..0f40a65 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,13 +37,17 @@ object ProducerResponse { }) }) - ProducerResponse(correlationId, Map(statusPairs:_*)) + val delayTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) } } case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** @@ -54,6 +58,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { + val delayTimeSize = if(requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -66,7 +71,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P 2 + /* error code */ 8 /* offset */ } - }) + }) + + delayTimeSize + } def writeTo(buffer: ByteBuffer) { @@ -85,6 +92,10 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P buffer.putLong(nextOffset) } }) + // Delay time is only supported on V1 style requests + if(requestVersion > 0) { + buffer.putInt(delayTime) + } } override def describe(details: Boolean):String = { toString } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index c16f7ed..d851e8d 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -118,7 +118,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.payload()) + val fetchResponse = FetchResponse.readFrom(response.payload(), request.versionId) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 83fc474..c9547bf 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -37,8 +37,17 @@ import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, +abstract class AbstractFetcherThread(name: String, + clientId: String, + sourceBroker: BrokerEndPoint, + socketTimeout: Int, + socketBufferSize: Int, + fetchSize: Int, + fetcherBrokerId: Int = -1, + maxWait: Int = 0, + minBytes: Int = 1, + fetchBackOffMs: Int = 0, + fetchRequestVersion: Short = FetchRequest.CurrentVersion, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map @@ -52,7 +61,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). - minBytes(minBytes) + minBytes(minBytes). + requestVersion(fetchRequestVersion) /* callbacks to be defined in subclass */ diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index de6cf5b..8e5cb39 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -55,7 +55,7 @@ case class FetchMetadata(fetchMinBytes: Int, class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) extends DelayedOperation(delayMs) { /** @@ -131,7 +131,8 @@ class DelayedFetch(delayMs: Long, val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 05078b2..7a9c30b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -53,7 +53,7 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code @@ -126,7 +126,8 @@ class DelayedProduce(delayMs: Long, */ override def onComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) - responseCallback(responseStatus) + // Zero delay time until quotas are enforced + responseCallback(responseStatus, 0) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d63bc18..5ecfc1a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -232,7 +232,7 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -257,7 +257,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.noOperation(request.processor, request) } } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) + val response = ProducerResponse(produceRequest.correlationId, responseStatus, produceRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } } @@ -287,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData], delayTime : Int) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here @@ -302,7 +302,7 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } - val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5cca85c..465d72b 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -246,7 +246,7 @@ class OffsetManager(val config: OffsetManagerConfig, new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b31b432..f6f9378 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{OffsetRequest, FetchResponsePartitionData} +import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, @@ -38,6 +38,8 @@ class ReplicaFetcherThread(name:String, maxWait = brokerConfig.replicaFetchWaitMaxMs, minBytes = brokerConfig.replicaFetchMinBytes, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, + fetchRequestVersion = + if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0, isInterruptible = false) { // process fetched data diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..ba2e1aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -278,10 +278,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) { if (isValidRequiredAcks(requiredAcks)) { - val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -309,7 +308,8 @@ class ReplicaManager(val config: KafkaConfig, } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) + // Zero delay time until quotas are enforced + responseCallback(produceResponseStatus, 0) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -320,7 +320,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } - responseCallback(responseStatus) + responseCallback(responseStatus, 0) } } @@ -417,7 +417,7 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchMinBytes: Int, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId @@ -443,7 +443,8 @@ class ReplicaManager(val config: KafkaConfig, if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 5717165..3251879 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -18,9 +18,12 @@ package kafka.api +import java.nio.channels.GatheringByteChannel + import kafka.cluster.{BrokerEndPoint, EndPoint, Broker} import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.common._ +import kafka.consumer.FetchRequestAndResponseStatsRegistry import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.SystemTime @@ -150,7 +153,7 @@ object SerializationTestUtils { ProducerResponse(1, Map( TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) - )) + ), ProducerRequest.CurrentVersion, 100) def createTestFetchRequest: FetchRequest = { new FetchRequest(requestInfo = requestInfos) @@ -304,4 +307,39 @@ class RequestResponseSerializationTest extends JUnitSuite { assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized) } } + + @Test + def testProduceResponseVersion() { + val oldClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + )) + + val newClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + + val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) + newClientResponse.writeTo(buffer) + buffer.rewind() + assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + } + + @Test + def testFetchResponseVersion() { + val oldClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 0) + + val newClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + } } diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index f3ab3f4..00764cc 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -27,7 +27,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", purgeInterval = 1) } override def tearDown() { @@ -68,6 +68,18 @@ class DelayedOperationTest extends JUnit3Suite { } @Test + def testTimeoutNoKeys() { + val expiration = 20L + val r1 = new MockDelayedOperation(expiration) + val start = System.currentTimeMillis + purgatory.tryCompleteElseWatch(r1, Seq("a1")) + r1.awaitExpiration() + val elapsed = System.currentTimeMillis - start + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 00d5933..09c5647 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -78,7 +78,7 @@ class ReplicaManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } -- 1.7.12.4 From c0d2fb1780a0362415aff285bc5016c2a2e10c4d Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 8 Jun 2015 17:39:16 -0700 Subject: [PATCH 2/5] Addressing Joel's comments --- .../kafka/clients/producer/internals/Sender.java | 18 +++++------ .../org/apache/kafka/common/protocol/Protocol.java | 14 ++++++--- .../kafka/common/requests/FetchResponse.java | 14 ++++----- .../kafka/common/requests/ProduceResponse.java | 14 ++++----- .../clients/consumer/internals/FetcherTest.java | 4 +-- .../clients/producer/internals/SenderTest.java | 6 ++-- core/src/main/scala/kafka/api/FetchResponse.scala | 36 +++++++++++++++------- .../main/scala/kafka/api/ProducerResponse.scala | 17 +++++----- .../api/RequestResponseSerializationTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 14 +-------- 10 files changed, 72 insertions(+), 67 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d4f751f..1f88c8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -254,8 +254,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); - this.sensors.recordQuotaDelay(response.request().request().destination(), - response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); + this.sensors.recordThrottleTime(response.request().request().destination(), + response.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -355,7 +355,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; - public final Sensor quotaDelayTimeSensor; + public final Sensor throttleTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -385,11 +385,11 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); - this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + this.throttleTimeSensor = metrics.sensor("produce-throttle-time"); m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); - this.quotaDelayTimeSensor.add(m, new Avg()); + this.throttleTimeSensor.add(m, new Avg()); m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); - this.quotaDelayTimeSensor.add(m, new Max()); + this.throttleTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); @@ -526,8 +526,8 @@ public class Sender implements Runnable { } } - public void recordQuotaDelay(int node, long delayTimeMs) { - this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + public void recordThrottleTime(int node, long throttleTimeMs) { + this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 461388e..048d761 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,8 +107,6 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - // The V1 Fetch Request body is the same as V0. - // Only the version number is incremented to indicate a newer client public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", @@ -120,7 +118,11 @@ public class Protocol { INT16), new Field("base_offset", INT64))))))), - new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + new Field("throttle_time_ms", + INT32, + "Duration in milliseconds for which the request was throttled" + + " due to quota violation. (Zero if the request did not violate any quota.)", + 0)); public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; @@ -374,8 +376,10 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, - "Amount of time in milliseconds the request was throttled if at all", + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", + INT32, + "Duration in milliseconds for which the request was throttled" + + " due to quota violation. (Zero if the request did not violate any quota.)", 0), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 005ec08..1ef3db7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,7 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -60,7 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; - private final int delayTime; + private final int throttleTime; public static final class PartitionData { public final short errorCode; @@ -74,9 +74,9 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData, int delayTime) { + public FetchResponse(Map responseData, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); - this.delayTime = delayTime; + this.throttleTime = throttleTime; Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -97,13 +97,13 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); - struct.set(QUOTA_DELAY_KEY_NAME, delayTime); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responseData = responseData; } public FetchResponse(Struct struct) { super(struct); - this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); + this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -123,7 +123,7 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } - public int getDelayTime() { return this.delayTime; } + public int getThrottleTime() { return this.throttleTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 46fd87d..33789c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,7 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -50,9 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; - private final int delayTime; + private final int throttleTime; - public ProduceResponse(Map responses, int delayTime) { + public ProduceResponse(Map responses, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -72,9 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); - struct.set(QUOTA_DELAY_KEY_NAME, delayTime); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responses = responses; - this.delayTime = delayTime; + this.throttleTime = throttleTime; } public ProduceResponse(Struct struct) { @@ -92,13 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } - this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); + this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); } public Map responses() { return this.responses; } - public int getDelayTime() { return this.delayTime; } + public int getThrottleTime() { return this.throttleTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index faefe0c..d8f99f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -195,8 +195,8 @@ public class FetcherTest { assertEquals(300, max); } - private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int delayTime) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), delayTime); + private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) { + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2cc9e50..2d4dac5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -87,7 +87,7 @@ public class SenderTest { } /* - * Send multiple request. Verify that the client side quota metrics have the right values + * Send multiple requests. Verify that the client side quota metrics have the right values */ @Test public void testQuotaMetrics() throws Exception { @@ -167,10 +167,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); + ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs); return response.toStruct(); } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 7881a65..1a8cc84 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -66,13 +66,19 @@ class PartitionDataSend(val partitionId: Int, override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize +<<<<<<< HEAD override def destination: String = "" override def writeTo(channel: GatheringByteChannel): Long = { var written = 0L if(buffer.hasRemaining) +======= + override def writeTo(channel: GatheringByteChannel): Int = { + var written = 0 + if (buffer.hasRemaining) +>>>>>>> Addressing Joel's comments written += channel.write(buffer) - if(!buffer.hasRemaining && messagesSentSize < messageSize) { + if (!buffer.hasRemaining && messagesSentSize < messageSize) { val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize) messagesSentSize += bytesSent written += bytesSent @@ -125,6 +131,7 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) +<<<<<<< HEAD override def writeTo(channel: GatheringByteChannel): Long = { if (completed) throw new KafkaException("This operation cannot be completed on a complete request.") @@ -133,6 +140,14 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { if(buffer.hasRemaining) written += channel.write(buffer) if(!buffer.hasRemaining && !sends.completed) { +======= + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + if (buffer.hasRemaining) + written += channel.write(buffer) + if (!buffer.hasRemaining && !sends.complete) { +>>>>>>> Addressing Joel's comments written += sends.writeTo(channel) } sent += written @@ -151,8 +166,7 @@ object FetchResponse { // The request version is used to determine which fields we can expect in the response def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt - val delayTime = if (requestVersion > 0) buffer.getInt else 0 - + val throttleTime = if (requestVersion > 0) buffer.getInt else 0 val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) @@ -161,23 +175,23 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*), requestVersion, delayTime) + FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime) } } case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData], requestVersion : Int = 0, - delayTime : Int = 0) + throttleTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - val delayTimeSize = if(requestVersion > 0) 4 else 0 + val throttleTimeSize = if (requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + delayTimeSize + + FetchResponse.headerSize + throttleTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -228,12 +242,12 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.throttleTimeSize) buffer.putInt(payloadSize) buffer.putInt(fetchResponse.correlationId) - // Include the delayTime only if the client can read it - if(fetchResponse.requestVersion > 0) { - buffer.putInt(fetchResponse.delayTime) + // Include the throttleTime only if the client can read it + if (fetchResponse.requestVersion > 0) { + buffer.putInt(fetchResponse.throttleTime) } buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 0f40a65..2899f20 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,8 +37,8 @@ object ProducerResponse { }) }) - val delayTime = buffer.getInt - ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) + val throttleTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime) } } @@ -47,7 +47,7 @@ case class ProducerResponseStatus(var error: Short, offset: Long) case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus], requestVersion : Int = 0, - delayTime : Int = 0) + throttleTime : Int = 0) extends RequestOrResponse() { /** @@ -58,7 +58,7 @@ case class ProducerResponse(correlationId: Int, def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { - val delayTimeSize = if(requestVersion > 0) 4 else 0 + val throttleTimeSize = if (requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -72,8 +72,7 @@ case class ProducerResponse(correlationId: Int, 8 /* offset */ } }) + - delayTimeSize - + throttleTimeSize } def writeTo(buffer: ByteBuffer) { @@ -92,9 +91,9 @@ case class ProducerResponse(correlationId: Int, buffer.putLong(nextOffset) } }) - // Delay time is only supported on V1 style requests - if(requestVersion > 0) { - buffer.putInt(delayTime) + // Throttle time is only supported on V1 style requests + if (requestVersion > 0) { + buffer.putInt(throttleTime) } } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 3251879..d197ffb 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -326,7 +326,7 @@ class RequestResponseSerializationTest extends JUnitSuite { val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) newClientResponse.writeTo(buffer) buffer.rewind() - assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100) } @Test diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 00764cc..f3ab3f4 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -27,7 +27,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", purgeInterval = 1) + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") } override def tearDown() { @@ -68,18 +68,6 @@ class DelayedOperationTest extends JUnit3Suite { } @Test - def testTimeoutNoKeys() { - val expiration = 20L - val r1 = new MockDelayedOperation(expiration) - val start = System.currentTimeMillis - purgatory.tryCompleteElseWatch(r1, Seq("a1")) - r1.awaitExpiration() - val elapsed = System.currentTimeMillis - start - assertTrue("r1 completed due to expiration", r1.isCompleted()) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) - } - - @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) -- 1.7.12.4 From 29c056ce5f27eef4007467cf56d3f1b5b0c8fa79 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 9 Jun 2015 10:06:56 -0700 Subject: [PATCH 3/5] Merging --- .../apache/kafka/clients/producer/internals/Sender.java | 2 +- core/src/main/scala/kafka/api/FetchResponse.scala | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1f88c8a..aa51782 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -526,7 +526,7 @@ public class Sender implements Runnable { } } - public void recordThrottleTime(int node, long throttleTimeMs) { + public void recordThrottleTime(String node, long throttleTimeMs) { this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 1a8cc84..cf98aee 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -66,17 +66,11 @@ class PartitionDataSend(val partitionId: Int, override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize -<<<<<<< HEAD override def destination: String = "" override def writeTo(channel: GatheringByteChannel): Long = { var written = 0L if(buffer.hasRemaining) -======= - override def writeTo(channel: GatheringByteChannel): Int = { - var written = 0 - if (buffer.hasRemaining) ->>>>>>> Addressing Joel's comments written += channel.write(buffer) if (!buffer.hasRemaining && messagesSentSize < messageSize) { val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize) @@ -131,7 +125,6 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) -<<<<<<< HEAD override def writeTo(channel: GatheringByteChannel): Long = { if (completed) throw new KafkaException("This operation cannot be completed on a complete request.") @@ -140,14 +133,6 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { if(buffer.hasRemaining) written += channel.write(buffer) if(!buffer.hasRemaining && !sends.completed) { -======= - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if (buffer.hasRemaining) - written += channel.write(buffer) - if (!buffer.hasRemaining && !sends.complete) { ->>>>>>> Addressing Joel's comments written += sends.writeTo(channel) } sent += written -- 1.7.12.4 From f661167bad0601ff928d38e57a85f35e9c164dc7 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 9 Jun 2015 10:10:06 -0700 Subject: [PATCH 4/5] Chaning variable name --- .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 68479df..79254fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,7 @@ public class Fetcher { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private final KafkaClient client; @@ -338,7 +338,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); - this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } /** -- 1.7.12.4 From b84b975157332bc505598b40a66f09a1c37f8e2a Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 30 Jun 2015 19:43:25 -0700 Subject: [PATCH 5/5] Addressing Joel's comments --- .../kafka/clients/consumer/internals/Fetcher.java | 17 ++++++++--------- .../kafka/clients/producer/internals/Sender.java | 17 ++++++++--------- .../clients/consumer/internals/FetcherTest.java | 19 +++++++------------ .../clients/producer/internals/SenderTest.java | 22 +++++++++------------- 4 files changed, 32 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 79254fa..0196dde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,6 @@ public class Fetcher { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private final KafkaClient client; @@ -336,9 +335,9 @@ public class Fetcher { } this.sensors.bytesFetched.record(totalBytes); this.sensors.recordsFetched.record(totalCount); + this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); - this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } /** @@ -392,7 +391,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; - public final Sensor quotaDelayTimeSensor; + public final Sensor fetchThrottleTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -443,16 +442,16 @@ public class Fetcher { "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); - this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); - this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); + this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms", tags), new Avg()); - this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", - this.metricGrpName, - "The maximum throttle time in ms", - tags), new Max()); + this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index aa51782..d70509f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -255,7 +254,7 @@ public class Sender implements Runnable { } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(response.request().request().destination(), - response.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); + produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -355,7 +354,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; - public final Sensor throttleTimeSensor; + public final Sensor produceThrottleTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -385,11 +384,11 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); - this.throttleTimeSensor = metrics.sensor("produce-throttle-time"); - m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); - this.throttleTimeSensor.add(m, new Avg()); - m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); - this.throttleTimeSensor.add(m, new Max()); + this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.produceThrottleTimeSensor.add(m, new Avg()); + m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.produceThrottleTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); @@ -527,7 +526,7 @@ public class Sender implements Runnable { } public void recordThrottleTime(String node, long throttleTimeMs) { - this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); + this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d8f99f9..3c2d11c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -50,6 +50,7 @@ public class FetcherTest { private String topicName = "test"; private String groupId = "test-group"; + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; private TopicPartition tp = new TopicPartition(topicName, 0); private long retryBackoffMs = 0L; private int minBytes = 1; @@ -64,6 +65,7 @@ public class FetcherTest { private SubscriptionState subscriptions = new SubscriptionState(); private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); + private static final double EPS = 0.0001; private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); @@ -167,7 +169,7 @@ public class FetcherTest { } /* - * Send multiple request. Verify that the client side quota metrics have the right values + * Send multiple requests. Verify that the client side quota metrics have the right values */ @Test public void testQuotaMetrics() throws Exception { @@ -182,17 +184,10 @@ public class FetcherTest { } Map allMetrics = metrics.metrics(); - long avg = 0; - long max = 0; - for (MetricName m : allMetrics.keySet()) { - if (m.name().equals("throttle-time-avg")) { - avg = (long) allMetrics.get(m).value(); - } else if (m.name().equals("throttle-time-max")) { - max = (long) allMetrics.get(m).value(); - } - } - assertEquals(200, avg); - assertEquals(300, max); + KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); + KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); + assertEquals(200, avgMetric.value(), EPS); + assertEquals(300, maxMetric.value(), EPS); } private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2d4dac5..aa44991 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -45,6 +45,9 @@ public class SenderTest { private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; private static final int REQUEST_TIMEOUT_MS = 10000; + private static final String CLIENT_ID = "clientId"; + private static final String METRIC_GROUP = "producer-metrics"; + private static final double EPS = 0.0001; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -64,11 +67,12 @@ public class SenderTest { REQUEST_TIMEOUT_MS, metrics, time, - "clientId"); + CLIENT_ID); @Before public void setup() { metadata.update(cluster, time.milliseconds()); + metricTags.put("client-id", CLIENT_ID); } @Test @@ -98,19 +102,11 @@ public class SenderTest { client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); sender.run(time.milliseconds()); } - long avg = 0; - long max = 0; - Map allMetrics = metrics.metrics(); - for (MetricName m : allMetrics.keySet()) { - if (m.name().equals("throttle-time-avg")) { - avg = (long) allMetrics.get(m).value(); - } else if (m.name().equals("throttle-time-max")) { - max = (long) allMetrics.get(m).value(); - } - } - assertEquals(200, avg); - assertEquals(300, max); + KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags)); + KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags)); + assertEquals(200, avgMetric.value(), EPS); + assertEquals(300, maxMetric.value(), EPS); } @Test -- 1.7.12.4