From d2125cf77487e3fc41cb41184da048a9ad0e54bd Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 20 Apr 2015 16:14:49 -0700 Subject: [PATCH 1/2] Quotas patch for KAFKA-2136. Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response --- .../kafka/clients/consumer/internals/Fetcher.java | 15 ++++++- .../kafka/clients/producer/internals/Sender.java | 15 +++++++ .../org/apache/kafka/common/protocol/Protocol.java | 34 ++++++++++++--- .../apache/kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 8 +++- .../kafka/common/requests/ProduceRequest.java | 4 +- .../kafka/common/requests/ProduceResponse.java | 8 +++- .../clients/consumer/internals/FetcherTest.java | 49 ++++++++++++++++++---- .../clients/producer/internals/SenderTest.java | 39 +++++++++++++++-- .../kafka/common/requests/RequestResponseTest.java | 4 +- core/src/main/scala/kafka/api/FetchRequest.scala | 2 +- core/src/main/scala/kafka/api/FetchResponse.scala | 21 +++++++--- .../src/main/scala/kafka/api/ProducerRequest.scala | 2 +- .../main/scala/kafka/api/ProducerResponse.scala | 17 ++++++-- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 ++- .../main/scala/kafka/server/DelayedProduce.scala | 5 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++-- .../main/scala/kafka/server/OffsetManager.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 13 +++--- .../api/RequestResponseSerializationTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- 22 files changed, 218 insertions(+), 53 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ef9dd52..321b9de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,7 @@ public class Fetcher { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; private final KafkaClient client; @@ -337,6 +337,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } /** @@ -390,6 +391,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; + public final Sensor quotaDelayTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -439,6 +441,17 @@ public class Fetcher { this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); + + this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.metricGrpName, + "The average throttle time in ms", + tags), new Avg()); + + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 70954ca..fa65b93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -235,6 +236,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + this.sensors.recordQuotaDelay(response.request().request().destination(), + response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -334,6 +337,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; + public final Sensor quotaDelayTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -363,6 +367,12 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); + this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Avg()); + m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Max()); + this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); this.recordsPerRequestSensor.add(m, new Rate()); @@ -497,6 +507,11 @@ public class Sender implements Runnable { nodeRequestTime.record(latency, now); } } + + public void recordQuotaDelay(int node, long delayTimeMs) { + this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 9c4518e..6d53c5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,9 +107,23 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - - public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; - public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; + + public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16), + new Field("base_offset", + INT64))))))), + new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + + public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; + public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -312,6 +326,9 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0; public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -327,9 +344,14 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - - public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0}; - public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0}; + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), + new Field("throttle_time_ms", INT32, + "Amount of time in milliseconds the request was throttled if at all", + 0)); + + public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; + public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; /* Consumer metadata api */ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 8686d83..aaec66d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -130,7 +130,7 @@ public class FetchRequest extends AbstractRequest { responseData.put(entry.getKey(), partitionResponse); } - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } public int replicaId() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index eb8951f..4d7f61e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,6 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -59,6 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; + private final int delayTime; public static final class PartitionData { public final short errorCode; @@ -72,7 +74,7 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData) { + public FetchResponse(Map responseData, int delayTime) { super(new Struct(CURRENT_SCHEMA)); Map> topicsData = CollectionUtils.groupDataByTopic(responseData); @@ -94,7 +96,9 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responseData = responseData; + this.delayTime = delayTime; } public FetchResponse(Struct struct) { @@ -113,11 +117,13 @@ public class FetchResponse extends AbstractRequestResponse { responseData.put(new TopicPartition(topic, partition), partitionData); } } + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); } public Map responseData() { return responseData; } + public int getDelayTime() {return this.delayTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index fabeae3..ea69c6f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -27,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequest { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -102,7 +102,7 @@ public class ProduceRequest extends AbstractRequest { responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); } - return new ProduceResponse(responseMap); + return new ProduceResponse(responseMap, 0); } public short acks() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 37ec0b7..ca58031 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,6 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -49,8 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; + private final int delayTime; - public ProduceResponse(Map responses) { + public ProduceResponse(Map responses, int delayTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -70,7 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responses = responses; + this.delayTime = delayTime; } public ProduceResponse(Struct struct) { @@ -88,11 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); } public Map responses() { return this.responses; } + public int getDelayTime() {return this.delayTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4195410..6c50de2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -18,12 +18,16 @@ package org.apache.kafka.clients.consumer.internals; import static org.junit.Assert.assertEquals; +import java.util.concurrent.Future; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -102,7 +106,7 @@ public class FetcherTest { // normal fetch fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); client.poll(0, time.milliseconds()); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); @@ -124,14 +128,14 @@ public class FetcherTest { // fetch with not leader fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); // fetch with unknown topic partition fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -139,7 +143,7 @@ public class FetcherTest { // fetch with out of range subscriptions.fetched(tp, 5); fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -156,7 +160,7 @@ public class FetcherTest { // fetch with out of range fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -164,8 +168,39 @@ public class FetcherTest { assertEquals(0L, (long) subscriptions.consumed(tp)); } - private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + subscriptions.subscribe(tp); + subscriptions.fetched(tp, 0); + subscriptions.consumed(tp, 0); + + for(int i=1;i<=3;i++) { + // normal fetch + fetcher.initFetches(cluster, time.milliseconds()); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); + client.poll(0, time.milliseconds()); + } + + Map allMetrics = metrics.metrics(); + long avg = 0; + long max = 0; + for(MetricName m : allMetrics.keySet()) { + if(m.name().equals("throttle-time-avg")) { + avg = (long)allMetrics.get(m).value(); + } + else if(m.name().equals("throttle-time-max")) { + max = (long)allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + + private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int delayTime) { + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), delayTime); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..712593a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,8 +26,11 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; @@ -76,7 +79,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -84,7 +87,35 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ @Test + public void testQuotaMetrics() throws Exception { + final long offset = 0; + for(int i=1;i<=3;i++) { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100*i)); + sender.run(time.milliseconds()); + } + long avg = 0; + long max = 0; + + Map allMetrics = metrics.metrics(); + for(MetricName m : allMetrics.keySet()) { + if(m.name().equals("throttle-time-avg")) { + avg = (long)allMetrics.get(m).value(); + } + else if(m.name().equals("throttle-time-max")) { + max = (long)allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + + @Test public void testRetries() throws Exception { // create a sender with retries = 1 int maxRetries = 1; @@ -110,7 +141,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); long offset = 0; - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -138,10 +169,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp); + ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e3cc196..7b1c9ea 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -103,7 +103,7 @@ public class RequestResponseTest { private AbstractRequestResponse createFetchResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } private AbstractRequest createHeartBeatRequest() { @@ -179,6 +179,6 @@ public class RequestResponseTest { private AbstractRequestResponse createProduceResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); - return new ProduceResponse(responseData); + return new ProduceResponse(responseData, 0); } } diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b038c15..dfe104d 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -31,7 +31,7 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 val DefaultCorrelationId = 0 diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 75aaf57..45e09ea 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -138,6 +138,7 @@ object FetchResponse { 4 + /* correlationId */ 4 /* topic count */ + def readFrom(buffer: ByteBuffer): FetchResponse = { val correlationId = buffer.getInt val topicCount = buffer.getInt @@ -148,20 +149,24 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*)) + val delayTime = buffer.getInt + FetchResponse(correlationId, Map(pairs:_*), FetchRequest.CurrentVersion, delayTime) } } -case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) +case class FetchResponse(correlationId: Int, + data: Map[TopicAndPartition, FetchResponsePartitionData], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - + val delayTimeSize = if(requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + + FetchResponse.headerSize + delayTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -209,10 +214,16 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { override def complete = sent >= sendSize - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) + // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) buffer.putInt(size) buffer.putInt(fetchResponse.correlationId) buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count + // Include the delayTime only if the client can read it + if(fetchResponse.delayTimeSize > 0) { + buffer.putInt(fetchResponse.delayTime) + } + buffer.rewind() val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 570b2da..edc0bb6 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -25,7 +25,7 @@ import kafka.network.RequestChannel.Response import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5d1fac4..0f40a65 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,13 +37,17 @@ object ProducerResponse { }) }) - ProducerResponse(correlationId, Map(statusPairs:_*)) + val delayTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) } } case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** @@ -54,6 +58,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { + val delayTimeSize = if(requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -66,7 +71,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P 2 + /* error code */ 8 /* offset */ } - }) + }) + + delayTimeSize + } def writeTo(buffer: ByteBuffer) { @@ -85,6 +92,10 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P buffer.putLong(nextOffset) } }) + // Delay time is only supported on V1 style requests + if(requestVersion > 0) { + buffer.putInt(delayTime) + } } override def describe(details: Boolean):String = { toString } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index de6cf5b..8e5cb39 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -55,7 +55,7 @@ case class FetchMetadata(fetchMinBytes: Int, class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) extends DelayedOperation(delayMs) { /** @@ -131,7 +131,8 @@ class DelayedFetch(delayMs: Long, val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 05078b2..7a9c30b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -53,7 +53,7 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code @@ -126,7 +126,8 @@ class DelayedProduce(delayMs: Long, */ override def onComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) - responseCallback(responseStatus) + // Zero delay time until quotas are enforced + responseCallback(responseStatus, 0) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b4004aa..c918ac9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -208,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -233,7 +233,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.noOperation(request.processor, request) } } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) + val response = ProducerResponse(produceRequest.correlationId, responseStatus, produceRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } } @@ -263,7 +263,7 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData], delayTime : Int) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here @@ -278,7 +278,7 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } - val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 420e2c3..e0d568f 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -245,7 +245,7 @@ class OffsetManager(val config: OffsetManagerConfig, new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ddd325..a3692ca 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -278,10 +278,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) { if (isValidRequiredAcks(requiredAcks)) { - val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -309,7 +308,8 @@ class ReplicaManager(val config: KafkaConfig, } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) + // Zero delay time until quotas are enforced + responseCallback(produceResponseStatus, 0) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -320,7 +320,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } - responseCallback(responseStatus) + responseCallback(responseStatus, 0) } } @@ -417,7 +417,7 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchMinBytes: Int, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId @@ -443,7 +443,8 @@ class ReplicaManager(val config: KafkaConfig, if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 566b538..e71f65d 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -150,7 +150,7 @@ object SerializationTestUtils { ProducerResponse(1, Map( TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) - )) + ), ProducerRequest.CurrentVersion, 100) def createTestFetchRequest: FetchRequest = { new FetchRequest(requestInfo = requestInfos) diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 9186c90..d4c0fc8 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -27,7 +27,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", purgeInterval = 1) } override def tearDown() { @@ -68,6 +68,18 @@ class DelayedOperationTest extends JUnit3Suite { } @Test + def testTimeoutNoKeys() { + val expiration = 20L + val r1 = new MockDelayedOperation(expiration) + val start = System.currentTimeMillis + purgatory.tryCompleteElseWatch(r1, Seq("a1")) + r1.awaitExpiration() + val elapsed = System.currentTimeMillis - start + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 00d5933..09c5647 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -78,7 +78,7 @@ class ReplicaManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } -- 1.7.12.4 From be744ce6ede3d61ed092033d4680dfd5e7648437 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 20 Apr 2015 16:50:27 -0700 Subject: [PATCH 2/2] Added more tests --- .../api/RequestResponseSerializationTest.scala | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index e71f65d..4b638be 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -304,4 +304,26 @@ class RequestResponseSerializationTest extends JUnitSuite { assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized) } } + + @Test + def testProduceResponseVersion() { + val oldClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + )) + + val newClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + + val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) + newClientResponse.writeTo(buffer) + buffer.rewind() + assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + } + } -- 1.7.12.4