From b87fce58c23c7c8d446d5cd11c5ca1ceb87fce46 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 11 May 2015 14:50:19 -0700 Subject: [PATCH 01/10] Fixing bug --- .../kafka/clients/consumer/internals/Fetcher.java | 14 +++++ .../kafka/clients/producer/internals/Sender.java | 15 ++++++ .../org/apache/kafka/common/protocol/Protocol.java | 34 ++++++++++--- .../apache/kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 8 ++- .../kafka/common/requests/ProduceRequest.java | 4 +- .../kafka/common/requests/ProduceResponse.java | 8 ++- .../clients/consumer/internals/FetcherTest.java | 6 +-- .../clients/producer/internals/SenderTest.java | 37 ++++++++++++-- .../kafka/common/requests/RequestResponseTest.java | 4 +- core/src/main/scala/kafka/api/FetchRequest.scala | 9 +++- core/src/main/scala/kafka/api/FetchResponse.scala | 26 +++++++--- .../src/main/scala/kafka/api/ProducerRequest.scala | 2 +- .../main/scala/kafka/api/ProducerResponse.scala | 17 +++++-- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../scala/kafka/server/AbstractFetcherThread.scala | 16 ++++-- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 +- .../main/scala/kafka/server/DelayedProduce.scala | 5 +- core/src/main/scala/kafka/server/KafkaApis.scala | 59 +++++++++++++++------- .../main/scala/kafka/server/OffsetManager.scala | 2 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 13 ++--- .../api/RequestResponseSerializationTest.scala | 40 ++++++++++++++- .../unit/kafka/server/DelayedOperationTest.scala | 12 +++++ .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- 25 files changed, 277 insertions(+), 69 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 9dc6697..3c3a84b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,6 +63,7 @@ import java.util.Set; public class Fetcher { public static final long EARLIEST_OFFSET_TIMESTAMP = -2L; public static final long LATEST_OFFSET_TIMESTAMP = -1L; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; private static final Logger log = LoggerFactory.getLogger(Fetcher.class); @@ -455,6 +456,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } /** @@ -493,6 +495,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; + public final Sensor quotaDelayTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -542,6 +545,17 @@ public class Fetcher { this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); + + this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.metricGrpName, + "The average throttle time in ms", + tags), new Avg()); + + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0baf16e..9c87879 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -253,6 +254,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + this.sensors.recordQuotaDelay(response.request().request().destination(), + response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -352,6 +355,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; + public final Sensor quotaDelayTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -381,6 +385,12 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); + this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Avg()); + m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Max()); + this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); this.recordsPerRequestSensor.add(m, new Rate()); @@ -515,6 +525,11 @@ public class Sender implements Runnable { nodeRequestTime.record(latency, now); } } + + public void recordQuotaDelay(int node, long delayTimeMs) { + this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 3dc8b01..461388e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,9 +107,23 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - - public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; - public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; + + public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16), + new Field("base_offset", + INT64))))))), + new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + + public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; + public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -342,6 +356,9 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0; public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -357,9 +374,14 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - - public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0}; - public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0}; + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, + "Amount of time in milliseconds the request was throttled if at all", + 0), + new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; + public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; /* Consumer metadata api */ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index df073a0..feb4109 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -132,7 +132,7 @@ public class FetchRequest extends AbstractRequest { switch (versionId) { case 0: - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index eb8951f..005ec08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,6 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -59,6 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; + private final int delayTime; public static final class PartitionData { public final short errorCode; @@ -72,8 +74,9 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData) { + public FetchResponse(Map responseData, int delayTime) { super(new Struct(CURRENT_SCHEMA)); + this.delayTime = delayTime; Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -94,11 +97,13 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responseData = responseData; } public FetchResponse(Struct struct) { super(struct); + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -118,6 +123,7 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } + public int getDelayTime() { return this.delayTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 715504b..5663f2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -27,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequest { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -103,7 +103,7 @@ public class ProduceRequest extends AbstractRequest { switch (versionId) { case 0: - return new ProduceResponse(responseMap); + return new ProduceResponse(responseMap, 0); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index febfc70..8bb5bda 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,6 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -49,8 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; + private final int delayTime; - public ProduceResponse(Map responses) { + public ProduceResponse(Map responses, int delayTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -70,7 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responses = responses; + this.delayTime = delayTime; } public ProduceResponse(Struct struct) { @@ -88,11 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); } public Map responses() { return this.responses; } + public int getDelayTime() { return this.delayTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a7c83ca..c6b7feb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,9 +22,11 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -323,9 +325,7 @@ public class FetcherTest { } private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), 0); return response.toStruct(); } - - } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..2cc9e50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,7 +26,9 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -76,7 +78,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -84,6 +86,33 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + final long offset = 0; + for (int i = 1; i <= 3; i++) { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); + sender.run(time.milliseconds()); + } + long avg = 0; + long max = 0; + + Map allMetrics = metrics.metrics(); + for (MetricName m : allMetrics.keySet()) { + if (m.name().equals("throttle-time-avg")) { + avg = (long) allMetrics.get(m).value(); + } else if (m.name().equals("throttle-time-max")) { + max = (long) allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + @Test public void testRetries() throws Exception { // create a sender with retries = 1 @@ -110,7 +139,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); long offset = 0; - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -138,10 +167,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp); + ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 8b2aca8..0645bc8 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -103,7 +103,7 @@ public class RequestResponseTest { private AbstractRequestResponse createFetchResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } private AbstractRequest createHeartBeatRequest() { @@ -182,6 +182,6 @@ public class RequestResponseTest { private AbstractRequestResponse createProduceResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); - return new ProduceResponse(responseData); + return new ProduceResponse(responseData, 0); } } diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 5b38f85..36e288f 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -31,7 +31,7 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 val DefaultCorrelationId = 0 @@ -170,7 +170,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @nonthreadsafe class FetchRequestBuilder() { private val correlationId = new AtomicInteger(0) - private val versionId = FetchRequest.CurrentVersion + private var versionId = FetchRequest.CurrentVersion private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -205,6 +205,11 @@ class FetchRequestBuilder() { this } + def requestVersion(versionId: Short): FetchRequestBuilder = { + this.versionId = versionId + this + } + def build() = { val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) requestMap.clear() diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index b9efec2..eb63b46 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -156,8 +156,12 @@ object FetchResponse { 4 + /* correlationId */ 4 /* topic count */ - def readFrom(buffer: ByteBuffer): FetchResponse = { + + // The request version is used to determine which fields we can expect in the response + def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt + val delayTime = if (requestVersion > 0) buffer.getInt else 0 + val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) @@ -166,20 +170,23 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*)) + FetchResponse(correlationId, Map(pairs:_*), requestVersion, delayTime) } } -case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) +case class FetchResponse(correlationId: Int, + data: Map[TopicAndPartition, FetchResponsePartitionData], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - + val delayTimeSize = if(requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + + FetchResponse.headerSize + delayTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -231,10 +238,17 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) + // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) buffer.putInt(payloadSize) buffer.putInt(fetchResponse.correlationId) + // Include the delayTime only if the client can read it + if(fetchResponse.requestVersion > 0) { + buffer.putInt(fetchResponse.delayTime) + } + buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count + buffer.rewind() private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index c866180..7fb143e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -26,7 +26,7 @@ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response object ProducerRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5d1fac4..0f40a65 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,13 +37,17 @@ object ProducerResponse { }) }) - ProducerResponse(correlationId, Map(statusPairs:_*)) + val delayTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) } } case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** @@ -54,6 +58,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { + val delayTimeSize = if(requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -66,7 +71,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P 2 + /* error code */ 8 /* offset */ } - }) + }) + + delayTimeSize + } def writeTo(buffer: ByteBuffer) { @@ -85,6 +92,10 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P buffer.putLong(nextOffset) } }) + // Delay time is only supported on V1 style requests + if(requestVersion > 0) { + buffer.putInt(delayTime) + } } override def describe(details: Boolean):String = { toString } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 7ebc040..4e1833a 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -131,7 +131,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.payload()) + val fetchResponse = FetchResponse.readFrom(response.payload(), request.versionId) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f843061..dca975c 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -37,8 +37,17 @@ import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, +abstract class AbstractFetcherThread(name: String, + clientId: String, + sourceBroker: BrokerEndPoint, + socketTimeout: Int, + socketBufferSize: Int, + fetchSize: Int, + fetcherBrokerId: Int = -1, + maxWait: Int = 0, + minBytes: Int = 1, + fetchBackOffMs: Int = 0, + fetchRequestVersion: Short = FetchRequest.CurrentVersion, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map @@ -52,7 +61,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). - minBytes(minBytes) + minBytes(minBytes). + requestVersion(fetchRequestVersion) /* callbacks to be defined in subclass */ diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index de6cf5b..8e5cb39 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -55,7 +55,7 @@ case class FetchMetadata(fetchMinBytes: Int, class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) extends DelayedOperation(delayMs) { /** @@ -131,7 +131,8 @@ class DelayedFetch(delayMs: Long, val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 05078b2..7a9c30b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -53,7 +53,7 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code @@ -126,7 +126,8 @@ class DelayedProduce(delayMs: Long, */ override def onComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) - responseCallback(responseStatus) + // Zero delay time until quotas are enforced + responseCallback(responseStatus, 0) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 67f0cad..07896d0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -257,40 +257,59 @@ class KafkaApis(val requestChannel: RequestChannel, val numBytesAppended = produceRequest.sizeInBytes // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) + { var errorInResponse = false - responseStatus.foreach { case (topicAndPartition, status) => + responseStatus.foreach + { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" - .format(produceRequest.correlationId, produceRequest.clientId, - topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) + { + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + produceRequest.correlationId, + produceRequest.clientId, + topicAndPartition, + ErrorMapping.exceptionNameFor(status.error))) errorInResponse = true } } - def produceResponseCallback { - if (produceRequest.requiredAcks == 0) { + def produceResponseCallback + { + if (produceRequest.requiredAcks == 0) + { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { + if (errorInResponse) + { info( "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( produceRequest.correlationId, produceRequest.clientId)) requestChannel.closeConnection(request.processor, request) - } else { + } + else + { requestChannel.noOperation(request.processor, request) } - } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) } - } + else + { + val response = ProducerResponse(produceRequest.correlationId, + responseStatus, + produceRequest.versionId, + delayTime) + requestChannel.sendResponse(new RequestChannel.Response(request, + new RequestOrResponseSend(request.connectionId, + response))) + } - quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback) + quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, + numBytesAppended, + produceResponseCallback) + } } // only allow appending to internal topic partitions @@ -318,7 +337,7 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData], delayTime : Int) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here @@ -332,9 +351,9 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } - val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + val rr = FetchResponse(fetchRequest.correlationId, responsePartitionData) def fetchResponseCallback { - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, rr))) } // Do not throttle replication traffic @@ -343,11 +362,13 @@ class KafkaApis(val requestChannel: RequestChannel, } else { quotaManagers.get(RequestKeys.FetchKey) match { case Some(quotaManager) => - quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback) + quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, rr.sizeInBytes, fetchResponseCallback) case None => warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey))) } } + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTime) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } // call the replica manager to fetch messages from the local replica diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0e613e7..4c5f4be 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -231,7 +231,7 @@ class OffsetManager(val config: OffsetManagerConfig, new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index fae22d2..711d749 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{OffsetRequest, FetchResponsePartitionData} +import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, @@ -38,6 +38,8 @@ class ReplicaFetcherThread(name:String, maxWait = brokerConfig.replicaFetchWaitMaxMs, minBytes = brokerConfig.replicaFetchMinBytes, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, + fetchRequestVersion = + if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0, isInterruptible = false) { // process fetched data diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d829e18..0426f11 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -301,10 +301,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) { if (isValidRequiredAcks(requiredAcks)) { - val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -332,7 +331,8 @@ class ReplicaManager(val config: KafkaConfig, } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) + // Zero delay time until quotas are enforced + responseCallback(produceResponseStatus, 0) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -343,7 +343,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } - responseCallback(responseStatus) + responseCallback(responseStatus, 0) } } @@ -440,7 +440,7 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchMinBytes: Int, fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -465,7 +465,8 @@ class ReplicaManager(val config: KafkaConfig, if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index b4c2a22..adfd570 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -18,9 +18,12 @@ package kafka.api +import java.nio.channels.GatheringByteChannel + import kafka.cluster.{BrokerEndPoint, EndPoint, Broker} import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.common._ +import kafka.consumer.FetchRequestAndResponseStatsRegistry import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.SystemTime @@ -150,7 +153,7 @@ object SerializationTestUtils { ProducerResponse(1, Map( TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) - )) + ), ProducerRequest.CurrentVersion, 100) def createTestFetchRequest: FetchRequest = { new FetchRequest(requestInfo = requestInfos) @@ -304,4 +307,39 @@ class RequestResponseSerializationTest extends JUnitSuite { assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized) } } + + @Test + def testProduceResponseVersion() { + val oldClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + )) + + val newClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + + val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) + newClientResponse.writeTo(buffer) + buffer.rewind() + assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + } + + @Test + def testFetchResponseVersion() { + val oldClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 0) + + val newClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + } } diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index df8d5b1..b722ce6 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -67,6 +67,18 @@ class DelayedOperationTest { } @Test + def testTimeoutNoKeys() { + val expiration = 20L + val r1 = new MockDelayedOperation(expiration) + val start = System.currentTimeMillis + purgatory.tryCompleteElseWatch(r1, Seq("a1")) + r1.awaitExpiration() + val elapsed = System.currentTimeMillis - start + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3770cb4..2630e71 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -77,7 +77,7 @@ class ReplicaManagerTest { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } -- 1.7.12.4 From 9d1eee57998c32c8d30a6dd52ffb5ecfe3c70390 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 8 Jun 2015 17:39:16 -0700 Subject: [PATCH 02/10] Addressing Joel's comments --- .../kafka/clients/producer/internals/Sender.java | 18 +++++------ .../org/apache/kafka/common/protocol/Protocol.java | 14 ++++++--- .../kafka/common/requests/FetchResponse.java | 14 ++++----- .../kafka/common/requests/ProduceResponse.java | 14 ++++----- .../clients/consumer/internals/FetcherTest.java | 4 +-- .../clients/producer/internals/SenderTest.java | 6 ++-- core/src/main/scala/kafka/api/FetchResponse.scala | 36 +++++++++++++++------- .../main/scala/kafka/api/ProducerResponse.scala | 17 +++++----- .../api/RequestResponseSerializationTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 12 -------- 10 files changed, 71 insertions(+), 66 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9c87879..49a4329 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -254,8 +254,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); - this.sensors.recordQuotaDelay(response.request().request().destination(), - response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); + this.sensors.recordThrottleTime(response.request().request().destination(), + response.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -355,7 +355,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; - public final Sensor quotaDelayTimeSensor; + public final Sensor throttleTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -385,11 +385,11 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); - this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + this.throttleTimeSensor = metrics.sensor("produce-throttle-time"); m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); - this.quotaDelayTimeSensor.add(m, new Avg()); + this.throttleTimeSensor.add(m, new Avg()); m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); - this.quotaDelayTimeSensor.add(m, new Max()); + this.throttleTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); @@ -526,8 +526,8 @@ public class Sender implements Runnable { } } - public void recordQuotaDelay(int node, long delayTimeMs) { - this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + public void recordThrottleTime(int node, long throttleTimeMs) { + this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 461388e..048d761 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,8 +107,6 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - // The V1 Fetch Request body is the same as V0. - // Only the version number is incremented to indicate a newer client public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", @@ -120,7 +118,11 @@ public class Protocol { INT16), new Field("base_offset", INT64))))))), - new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + new Field("throttle_time_ms", + INT32, + "Duration in milliseconds for which the request was throttled" + + " due to quota violation. (Zero if the request did not violate any quota.)", + 0)); public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; @@ -374,8 +376,10 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, - "Amount of time in milliseconds the request was throttled if at all", + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", + INT32, + "Duration in milliseconds for which the request was throttled" + + " due to quota violation. (Zero if the request did not violate any quota.)", 0), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 005ec08..1ef3db7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,7 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -60,7 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; - private final int delayTime; + private final int throttleTime; public static final class PartitionData { public final short errorCode; @@ -74,9 +74,9 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData, int delayTime) { + public FetchResponse(Map responseData, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); - this.delayTime = delayTime; + this.throttleTime = throttleTime; Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -97,13 +97,13 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); - struct.set(QUOTA_DELAY_KEY_NAME, delayTime); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responseData = responseData; } public FetchResponse(Struct struct) { super(struct); - this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); + this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -123,7 +123,7 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } - public int getDelayTime() { return this.delayTime; } + public int getThrottleTime() { return this.throttleTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 8bb5bda..d4a3e82 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,7 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -50,9 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; - private final int delayTime; + private final int throttleTime; - public ProduceResponse(Map responses, int delayTime) { + public ProduceResponse(Map responses, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -72,9 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); - struct.set(QUOTA_DELAY_KEY_NAME, delayTime); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responses = responses; - this.delayTime = delayTime; + this.throttleTime = throttleTime; } public ProduceResponse(Struct struct) { @@ -92,13 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } - this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); + this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); } public Map responses() { return this.responses; } - public int getDelayTime() { return this.delayTime; } + public int getThrottleTime() { return this.throttleTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index c6b7feb..66759a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -324,8 +324,8 @@ public class FetcherTest { return response.toStruct(); } - private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), 0); + private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) { + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime); return response.toStruct(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2cc9e50..2d4dac5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -87,7 +87,7 @@ public class SenderTest { } /* - * Send multiple request. Verify that the client side quota metrics have the right values + * Send multiple requests. Verify that the client side quota metrics have the right values */ @Test public void testQuotaMetrics() throws Exception { @@ -167,10 +167,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); + ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs); return response.toStruct(); } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index eb63b46..613f428 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -67,13 +67,19 @@ class PartitionDataSend(val partitionId: Int, override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize && !pending +<<<<<<< HEAD override def destination: String = "" override def writeTo(channel: GatheringByteChannel): Long = { var written = 0L if(buffer.hasRemaining) +======= + override def writeTo(channel: GatheringByteChannel): Int = { + var written = 0 + if (buffer.hasRemaining) +>>>>>>> Addressing Joel's comments written += channel.write(buffer) - if(!buffer.hasRemaining && messagesSentSize < messageSize) { + if (!buffer.hasRemaining && messagesSentSize < messageSize) { val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize) messagesSentSize += bytesSent written += bytesSent @@ -130,6 +136,7 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) +<<<<<<< HEAD override def writeTo(channel: GatheringByteChannel): Long = { if (completed) throw new KafkaException("This operation cannot be completed on a complete request.") @@ -138,6 +145,14 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { if(buffer.hasRemaining) written += channel.write(buffer) if(!buffer.hasRemaining && !sends.completed) { +======= + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + if (buffer.hasRemaining) + written += channel.write(buffer) + if (!buffer.hasRemaining && !sends.complete) { +>>>>>>> Addressing Joel's comments written += sends.writeTo(channel) } @@ -160,8 +175,7 @@ object FetchResponse { // The request version is used to determine which fields we can expect in the response def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt - val delayTime = if (requestVersion > 0) buffer.getInt else 0 - + val throttleTime = if (requestVersion > 0) buffer.getInt else 0 val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) @@ -170,23 +184,23 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*), requestVersion, delayTime) + FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime) } } case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData], requestVersion : Int = 0, - delayTime : Int = 0) + throttleTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - val delayTimeSize = if(requestVersion > 0) 4 else 0 + val throttleTimeSize = if (requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + delayTimeSize + + FetchResponse.headerSize + throttleTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -239,12 +253,12 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.throttleTimeSize) buffer.putInt(payloadSize) buffer.putInt(fetchResponse.correlationId) - // Include the delayTime only if the client can read it - if(fetchResponse.requestVersion > 0) { - buffer.putInt(fetchResponse.delayTime) + // Include the throttleTime only if the client can read it + if (fetchResponse.requestVersion > 0) { + buffer.putInt(fetchResponse.throttleTime) } buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 0f40a65..2899f20 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,8 +37,8 @@ object ProducerResponse { }) }) - val delayTime = buffer.getInt - ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) + val throttleTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime) } } @@ -47,7 +47,7 @@ case class ProducerResponseStatus(var error: Short, offset: Long) case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus], requestVersion : Int = 0, - delayTime : Int = 0) + throttleTime : Int = 0) extends RequestOrResponse() { /** @@ -58,7 +58,7 @@ case class ProducerResponse(correlationId: Int, def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { - val delayTimeSize = if(requestVersion > 0) 4 else 0 + val throttleTimeSize = if (requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -72,8 +72,7 @@ case class ProducerResponse(correlationId: Int, 8 /* offset */ } }) + - delayTimeSize - + throttleTimeSize } def writeTo(buffer: ByteBuffer) { @@ -92,9 +91,9 @@ case class ProducerResponse(correlationId: Int, buffer.putLong(nextOffset) } }) - // Delay time is only supported on V1 style requests - if(requestVersion > 0) { - buffer.putInt(delayTime) + // Throttle time is only supported on V1 style requests + if (requestVersion > 0) { + buffer.putInt(throttleTime) } } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index adfd570..b7e7967 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -326,7 +326,7 @@ class RequestResponseSerializationTest extends JUnitSuite { val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) newClientResponse.writeTo(buffer) buffer.rewind() - assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100) } @Test diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index b722ce6..df8d5b1 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -67,18 +67,6 @@ class DelayedOperationTest { } @Test - def testTimeoutNoKeys() { - val expiration = 20L - val r1 = new MockDelayedOperation(expiration) - val start = System.currentTimeMillis - purgatory.tryCompleteElseWatch(r1, Seq("a1")) - r1.awaitExpiration() - val elapsed = System.currentTimeMillis - start - assertTrue("r1 completed due to expiration", r1.isCompleted()) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) - } - - @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) -- 1.7.12.4 From 6cd08a16dd0e39eddfeae0489e29c72633e2a26e Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 9 Jun 2015 10:06:56 -0700 Subject: [PATCH 03/10] Merging --- .../apache/kafka/clients/producer/internals/Sender.java | 2 +- core/src/main/scala/kafka/api/FetchResponse.scala | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 49a4329..4e13c21 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -526,7 +526,7 @@ public class Sender implements Runnable { } } - public void recordThrottleTime(int node, long throttleTimeMs) { + public void recordThrottleTime(String node, long throttleTimeMs) { this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 613f428..56b6d8c 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -67,17 +67,11 @@ class PartitionDataSend(val partitionId: Int, override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize && !pending -<<<<<<< HEAD override def destination: String = "" override def writeTo(channel: GatheringByteChannel): Long = { var written = 0L if(buffer.hasRemaining) -======= - override def writeTo(channel: GatheringByteChannel): Int = { - var written = 0 - if (buffer.hasRemaining) ->>>>>>> Addressing Joel's comments written += channel.write(buffer) if (!buffer.hasRemaining && messagesSentSize < messageSize) { val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize) @@ -136,7 +130,6 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) -<<<<<<< HEAD override def writeTo(channel: GatheringByteChannel): Long = { if (completed) throw new KafkaException("This operation cannot be completed on a complete request.") @@ -145,14 +138,6 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { if(buffer.hasRemaining) written += channel.write(buffer) if(!buffer.hasRemaining && !sends.completed) { -======= - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if (buffer.hasRemaining) - written += channel.write(buffer) - if (!buffer.hasRemaining && !sends.complete) { ->>>>>>> Addressing Joel's comments written += sends.writeTo(channel) } -- 1.7.12.4 From 4f287e7360bac6474ea6422430d6ad58b0050a68 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 9 Jun 2015 10:10:06 -0700 Subject: [PATCH 04/10] Chaning variable name --- .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 3c3a84b..d3dbabc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,9 +61,9 @@ import java.util.Set; * This class manage the fetching process with the brokers. */ public class Fetcher { - public static final long EARLIEST_OFFSET_TIMESTAMP = -2L; - public static final long LATEST_OFFSET_TIMESTAMP = -1L; - private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; + private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; + private static final long LATEST_OFFSET_TIMESTAMP = -1L; + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final Logger log = LoggerFactory.getLogger(Fetcher.class); @@ -456,7 +456,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); - this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } /** -- 1.7.12.4 From c53e1d475b771d61cabd8ffcf2f161f28967ec5c Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 30 Jun 2015 19:43:25 -0700 Subject: [PATCH 05/10] Addressing Joel's comments --- .../kafka/clients/consumer/internals/Fetcher.java | 17 ++++++++--------- .../kafka/clients/producer/internals/Sender.java | 17 ++++++++--------- .../clients/consumer/internals/FetcherTest.java | 7 +++++++ .../clients/producer/internals/SenderTest.java | 22 +++++++++------------- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d3dbabc..f02041a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,6 @@ import java.util.Set; public class Fetcher { private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final Logger log = LoggerFactory.getLogger(Fetcher.class); @@ -454,9 +453,9 @@ public class Fetcher { } this.sensors.bytesFetched.record(totalBytes); this.sensors.recordsFetched.record(totalCount); + this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); - this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); } /** @@ -495,7 +494,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; - public final Sensor quotaDelayTimeSensor; + public final Sensor fetchThrottleTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -546,16 +545,16 @@ public class Fetcher { "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); - this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); - this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); + this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms", tags), new Avg()); - this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", - this.metricGrpName, - "The maximum throttle time in ms", - tags), new Max()); + this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4e13c21..d2e64f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -255,7 +254,7 @@ public class Sender implements Runnable { } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(response.request().request().destination(), - response.responseBody().getInt(THROTTLE_TIME_KEY_NAME)); + produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -355,7 +354,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; - public final Sensor throttleTimeSensor; + public final Sensor produceThrottleTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -385,11 +384,11 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); - this.throttleTimeSensor = metrics.sensor("produce-throttle-time"); - m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); - this.throttleTimeSensor.add(m, new Avg()); - m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); - this.throttleTimeSensor.add(m, new Max()); + this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.produceThrottleTimeSensor.add(m, new Avg()); + m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.produceThrottleTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); @@ -527,7 +526,7 @@ public class Sender implements Runnable { } public void recordThrottleTime(String node, long throttleTimeMs) { - this.throttleTimeSensor.record(throttleTimeMs, time.milliseconds()); + this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 66759a9..06ec707 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -59,6 +59,7 @@ public class FetcherTest { private String topicName = "test"; private String groupId = "test-group"; + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; private TopicPartition tp = new TopicPartition(topicName, 0); private int minBytes = 1; private int maxWaitMs = 0; @@ -73,6 +74,7 @@ public class FetcherTest { private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + private static final double EPS = 0.0001; private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); @@ -314,6 +316,11 @@ public class FetcherTest { return partitionData != null && partitionData.timestamp == timestamp; } }; + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); + KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); + assertEquals(200, avgMetric.value(), EPS); + assertEquals(300, maxMetric.value(), EPS); } private Struct listOffsetResponse(Errors error, List offsets) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2d4dac5..aa44991 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -45,6 +45,9 @@ public class SenderTest { private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; private static final int REQUEST_TIMEOUT_MS = 10000; + private static final String CLIENT_ID = "clientId"; + private static final String METRIC_GROUP = "producer-metrics"; + private static final double EPS = 0.0001; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -64,11 +67,12 @@ public class SenderTest { REQUEST_TIMEOUT_MS, metrics, time, - "clientId"); + CLIENT_ID); @Before public void setup() { metadata.update(cluster, time.milliseconds()); + metricTags.put("client-id", CLIENT_ID); } @Test @@ -98,19 +102,11 @@ public class SenderTest { client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); sender.run(time.milliseconds()); } - long avg = 0; - long max = 0; - Map allMetrics = metrics.metrics(); - for (MetricName m : allMetrics.keySet()) { - if (m.name().equals("throttle-time-avg")) { - avg = (long) allMetrics.get(m).value(); - } else if (m.name().equals("throttle-time-max")) { - max = (long) allMetrics.get(m).value(); - } - } - assertEquals(200, avg); - assertEquals(300, max); + KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags)); + KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags)); + assertEquals(200, avgMetric.value(), EPS); + assertEquals(300, maxMetric.value(), EPS); } @Test -- 1.7.12.4 From 75bdf1a008c52789ab3186f7801fd9ef00a8ae95 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 13 Jul 2015 13:29:45 -0700 Subject: [PATCH 06/10] Addressing Joel's comments --- .../kafka/common/requests/ProduceResponse.java | 5 ++- .../clients/consumer/internals/FetcherTest.java | 8 +++-- core/src/main/scala/kafka/api/FetchResponse.scala | 41 ++++++++++++---------- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index d4a3e82..24c4f33 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -98,7 +98,10 @@ public class ProduceResponse extends AbstractRequestResponse { public Map responses() { return this.responses; } - public int getThrottleTime() { return this.throttleTime; } + + public int getThrottleTime() { + return this.throttleTime; + } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 06ec707..d480c97 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -73,8 +73,12 @@ public class FetcherTest { private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); +<<<<<<< HEAD private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); private static final double EPS = 0.0001; +======= + private static final double EPSILON = 0.0001; +>>>>>>> Addressing Joel's comments private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); @@ -319,8 +323,8 @@ public class FetcherTest { Map allMetrics = metrics.metrics(); KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); - assertEquals(200, avgMetric.value(), EPS); - assertEquals(300, maxMetric.value(), EPS); + assertEquals(200, avgMetric.value(), EPSILON); + assertEquals(300, maxMetric.value(), EPSILON); } private Struct listOffsetResponse(Errors error, List offsets) { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 56b6d8c..eeba9a8 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -152,11 +152,6 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { object FetchResponse { - val headerSize = - 4 + /* correlationId */ - 4 /* topic count */ - - // The request version is used to determine which fields we can expect in the response def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt @@ -175,8 +170,8 @@ object FetchResponse { case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData], - requestVersion : Int = 0, - throttleTime : Int = 0) + requestVersion: Int = 0, + throttleTimeMs: Int = 0) extends RequestOrResponse() { /** @@ -184,8 +179,12 @@ case class FetchResponse(correlationId: Int, */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) val throttleTimeSize = if (requestVersion > 0) 4 else 0 + val headerSize = 4 + /* correlationId */ + 4 + /* topic count */ + throttleTimeSize + val sizeInBytes = - FetchResponse.headerSize + throttleTimeSize + + headerSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -194,6 +193,18 @@ case class FetchResponse(correlationId: Int, }) /* + * Writes the header of the FetchResponse to the input buffer + */ + def writeHeaderTo(buffer: ByteBuffer) = { + buffer.putInt(sizeInBytes) + buffer.putInt(correlationId) + // Include the throttleTime only if the client can read it + if (requestVersion > 0) { + buffer.putInt(throttleTimeMs) + } + buffer.putInt(dataGroupedByTopic.size) // topic count + } + /* * FetchResponse uses [sendfile](http://man7.org/linux/man-pages/man2/sendfile.2.html) * api for data transfer through the FetchResponseSend, so `writeTo` aren't actually being used. * It is implemented as an empty function to conform to `RequestOrResponse.writeTo` @@ -237,17 +248,9 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest - // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.throttleTimeSize) - buffer.putInt(payloadSize) - buffer.putInt(fetchResponse.correlationId) - // Include the throttleTime only if the client can read it - if (fetchResponse.requestVersion > 0) { - buffer.putInt(fetchResponse.throttleTime) - } - - buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count - + // The throttleTimeSize will be 0 if the request was made from a client sending a V0 style request + private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSize) + fetchResponse.writeHeaderTo(buffer) buffer.rewind() private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2630e71..b76b31a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -77,7 +77,7 @@ class ReplicaManagerTest { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime: Int) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } -- 1.7.12.4 From aaf262aab0bfdd6ec644d6326d09f1b70a107ab7 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 18 Aug 2015 13:18:56 -0700 Subject: [PATCH 07/10] Addressing comments --- .../kafka/clients/consumer/internals/Fetcher.java | 4 +- .../kafka/common/requests/FetchResponse.java | 10 ++- .../kafka/common/requests/ProduceResponse.java | 56 +++++++++----- .../clients/consumer/internals/FetcherTest.java | 88 +++++++++++++--------- core/src/main/scala/kafka/api/FetchResponse.scala | 42 +++++++---- .../main/scala/kafka/api/ProducerResponse.scala | 8 +- .../scala/kafka/server/ClientQuotaManager.scala | 11 ++- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 +- .../main/scala/kafka/server/DelayedProduce.scala | 5 +- core/src/main/scala/kafka/server/KafkaApis.scala | 55 ++++++-------- .../main/scala/kafka/server/OffsetManager.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 13 ++-- .../scala/kafka/server/ThrottledResponse.scala | 4 +- .../unit/kafka/server/ClientQuotaManagerTest.scala | 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- .../server/ThrottledResponseExpirationTest.scala | 2 +- 16 files changed, 172 insertions(+), 137 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f02041a..1ae6d03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,8 +61,8 @@ import java.util.Set; * This class manage the fetching process with the brokers. */ public class Fetcher { - private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; - private static final long LATEST_OFFSET_TIMESTAMP = -1L; + public static final long EARLIEST_OFFSET_TIMESTAMP = -2L; + public static final long LATEST_OFFSET_TIMESTAMP = -1L; private static final Logger log = LoggerFactory.getLogger(Fetcher.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 1ef3db7..bcb6e00 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -74,6 +74,11 @@ public class FetchResponse extends AbstractRequestResponse { } } + /** + * Constructor for Version 1 + * @param responseData fetched data grouped by topic-partition + * @param throttleTime Time in milliseconds the response was throttled + */ public FetchResponse(Map responseData, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); this.throttleTime = throttleTime; @@ -123,7 +128,10 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } - public int getThrottleTime() { return this.throttleTime; } + + public int getThrottleTime() { + return this.throttleTime; + } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 24c4f33..60847b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -40,6 +40,7 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; public static final long INVALID_OFFSET = -1L; + private static final int DEFAULT_THROTTLE_TIME = 0; /** * Possible error code: @@ -52,26 +53,25 @@ public class ProduceResponse extends AbstractRequestResponse { private final Map responses; private final int throttleTime; + /** + * Constructor for Version 0 + * @param responses Produced data grouped by topic-partition + */ + public ProduceResponse(Map responses) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, 0))); + initCommonFields(responses); + this.responses = responses; + this.throttleTime = DEFAULT_THROTTLE_TIME; + } + + /** + * Constructor for Version 1 + * @param responses Produced data grouped by topic-partition + * @param throttleTime Time in milliseconds the response was throttled + */ public ProduceResponse(Map responses, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); - Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); - List topicDatas = new ArrayList(responseByTopic.size()); - for (Map.Entry> entry : responseByTopic.entrySet()) { - Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, entry.getKey()); - List partitionArray = new ArrayList(); - for (Map.Entry partitionEntry : entry.getValue().entrySet()) { - PartitionResponse part = partitionEntry.getValue(); - Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) - .set(PARTITION_KEY_NAME, partitionEntry.getKey()) - .set(ERROR_CODE_KEY_NAME, part.errorCode) - .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - partitionArray.add(partStruct); - } - topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + initCommonFields(responses); struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responses = responses; this.throttleTime = throttleTime; @@ -95,6 +95,26 @@ public class ProduceResponse extends AbstractRequestResponse { this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); } + private void initCommonFields(Map responses) { + Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); + List topicDatas = new ArrayList(responseByTopic.size()); + for (Map.Entry> entry : responseByTopic.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : entry.getValue().entrySet()) { + PartitionResponse part = partitionEntry.getValue(); + Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME).set(PARTITION_KEY_NAME, + partitionEntry.getKey()).set( + ERROR_CODE_KEY_NAME, part.errorCode).set(BASE_OFFSET_KEY_NAME, part.baseOffset); + partitionArray.add(partStruct); + } + topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); + topicDatas.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + } + public Map responses() { return this.responses; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d480c97..22712bb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -56,7 +56,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class FetcherTest { - private String topicName = "test"; private String groupId = "test-group"; private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; @@ -73,29 +72,25 @@ public class FetcherTest { private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); -<<<<<<< HEAD - private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); - private static final double EPS = 0.0001; -======= private static final double EPSILON = 0.0001; ->>>>>>> Addressing Joel's comments + private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); private Fetcher fetcher = new Fetcher(consumerClient, - minBytes, - maxWaitMs, - fetchSize, - true, // check crc - new ByteArrayDeserializer(), - new ByteArrayDeserializer(), - metadata, - subscriptions, - metrics, - "consumer" + groupId, - metricTags, - time, - retryBackoffMs); + minBytes, + maxWaitMs, + fetchSize, + true, // check crc + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), + metadata, + subscriptions, + metrics, + "consumer" + groupId, + metricTags, + time, + retryBackoffMs); @Before public void setup() throws Exception { @@ -117,7 +112,7 @@ public class FetcherTest { // normal fetch fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); @@ -140,7 +135,7 @@ public class FetcherTest { // Now the rebalance happens and fetch positions are cleared subscriptions.changePartitionAssignment(Arrays.asList(tp)); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); // The active fetch should be ignored since its position is no longer valid @@ -155,7 +150,7 @@ public class FetcherTest { fetcher.initFetches(cluster); subscriptions.pause(tp); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); assertNull(fetcher.fetchedRecords().get(tp)); } @@ -176,7 +171,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -188,7 +183,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -200,7 +195,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertTrue(subscriptions.isOffsetResetNeeded(tp)); assertEquals(0, fetcher.fetchedRecords().size()); @@ -214,7 +209,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -244,7 +239,7 @@ public class FetcherTest { // with no commit position, we should reset using the default strategy defined above (EARLIEST) client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -258,7 +253,7 @@ public class FetcherTest { subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -272,7 +267,7 @@ public class FetcherTest { subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -287,11 +282,11 @@ public class FetcherTest { // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true); + listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true); // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), - listOffsetResponse(Errors.NONE, Arrays.asList(5L))); + listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); @@ -310,6 +305,32 @@ public class FetcherTest { assertEquals(cluster.topics().size(), allTopics.size()); } + /* + * Send multiple requests. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + List> records; + subscriptions.subscribe(tp); + subscriptions.seek(tp, 0); + + // normal fetch + for (int i = 1; i < 4; i++) { + fetcher.initFetches(cluster); + + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); + consumerClient.poll(0); + records = fetcher.fetchedRecords().get(tp); + assertEquals(3, records.size()); + } + + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); + KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); + assertEquals(200, avgMetric.value(), EPSILON); + assertEquals(300, maxMetric.value(), EPSILON); + } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp return new MockClient.RequestMatcher() { @@ -320,11 +341,6 @@ public class FetcherTest { return partitionData != null && partitionData.timestamp == timestamp; } }; - Map allMetrics = metrics.metrics(); - KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags)); - KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags)); - assertEquals(200, avgMetric.value(), EPSILON); - assertEquals(300, maxMetric.value(), EPSILON); } private Struct listOffsetResponse(Errors error, List offsets) { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index eeba9a8..b071c8e 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -166,6 +166,27 @@ object FetchResponse { }) FetchResponse(correlationId, Map(pairs:_*), requestVersion, throttleTime) } + + // Returns the size of the response header + def headerSize(requestVersion: Int): Int = { + val throttleTimeSize = if (requestVersion > 0) 4 else 0 + 4 + /* correlationId */ + 4 + /* topic count */ + throttleTimeSize + } + + // Returns the size of entire fetch response in bytes (including the header size) + def responseSize(data: Map[TopicAndPartition, FetchResponsePartitionData], + requestVersion: Int): Int = { + val dataGroupedByTopic = data.groupBy(_._1.topic) + headerSize(requestVersion) + + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { + val topicData = TopicData(curr._1, curr._2.map { + case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) + }) + folded + topicData.sizeInBytes + }) + } } case class FetchResponse(correlationId: Int, @@ -178,19 +199,8 @@ case class FetchResponse(correlationId: Int, * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - val throttleTimeSize = if (requestVersion > 0) 4 else 0 - val headerSize = 4 + /* correlationId */ - 4 + /* topic count */ - throttleTimeSize - - val sizeInBytes = - headerSize + - dataGroupedByTopic.foldLeft(0) ((folded, curr) => { - val topicData = TopicData(curr._1, curr._2.map { - case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) - }) - folded + topicData.sizeInBytes - }) + val headerSizeInBytes = FetchResponse.headerSize(requestVersion) + val sizeInBytes = FetchResponse.responseSize(data, requestVersion) /* * Writes the header of the FetchResponse to the input buffer @@ -199,9 +209,9 @@ case class FetchResponse(correlationId: Int, buffer.putInt(sizeInBytes) buffer.putInt(correlationId) // Include the throttleTime only if the client can read it - if (requestVersion > 0) { + if (requestVersion > 0) buffer.putInt(throttleTimeMs) - } + buffer.putInt(dataGroupedByTopic.size) // topic count } /* @@ -249,7 +259,7 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte override def destination = dest // The throttleTimeSize will be 0 if the request was made from a client sending a V0 style request - private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSize) + private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSizeInBytes) fetchResponse.writeHeaderTo(buffer) buffer.rewind() diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 2899f20..7719f30 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -23,6 +23,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.api.ApiUtils._ object ProducerResponse { + // readFrom assumes that the response is written using V1 format def readFrom(buffer: ByteBuffer): ProducerResponse = { val correlationId = buffer.getInt val topicCount = buffer.getInt @@ -46,8 +47,8 @@ case class ProducerResponseStatus(var error: Short, offset: Long) case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus], - requestVersion : Int = 0, - throttleTime : Int = 0) + requestVersion: Int = 0, + throttleTime: Int = 0) extends RequestOrResponse() { /** @@ -92,9 +93,8 @@ case class ProducerResponse(correlationId: Int, } }) // Throttle time is only supported on V1 style requests - if (requestVersion > 0) { + if (requestVersion > 0) buffer.putInt(throttleTime) - } } override def describe(details: Boolean):String = { toString } diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 9f8473f..de7f68d 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -110,13 +110,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = { + def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = { val clientSensors = getOrCreateQuotaSensors(clientId) - var delayTimeMs = 0L + var delayTimeMs = 0 try { clientSensors.quotaSensor.record(value) // trigger the callback immediately if quota is not violated - callback + callback(delayTimeMs) } catch { case qve: QuotaViolationException => // Compute the delay @@ -139,12 +139,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * we need to add a delay of X to W such that O * W / (W + X) = T. * Solving for X, we get X = (O - T)/T * W. */ - private def delayTime(metricValue: Double, config: MetricConfig): Long = - { + private def delayTime(metricValue: Double, config: MetricConfig): Int = { val quota = config.quota() val difference = metricValue - quota.bound val time = difference / quota.bound * config.timeWindowMs() * config.samples() - time.round + time.round.toInt } /** diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 8e5cb39..de6cf5b 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -55,7 +55,7 @@ case class FetchMetadata(fetchMinBytes: Int, class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, - responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) + responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) extends DelayedOperation(delayMs) { /** @@ -131,8 +131,7 @@ class DelayedFetch(delayMs: Long, val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - // Zero delay time until quotas are enforced - responseCallback(fetchPartitionData, 0) + responseCallback(fetchPartitionData) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 7a9c30b..05078b2 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -53,7 +53,7 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) + responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code @@ -126,8 +126,7 @@ class DelayedProduce(delayMs: Long, */ override def onComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) - // Zero delay time until quotas are enforced - responseCallback(responseStatus, 0) + responseCallback(responseStatus) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 07896d0..c838ca7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -257,15 +257,13 @@ class KafkaApis(val requestChannel: RequestChannel, val numBytesAppended = produceRequest.sizeInBytes // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) - { + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) - { + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( produceRequest.correlationId, produceRequest.clientId, @@ -275,41 +273,34 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def produceResponseCallback - { - if (produceRequest.requiredAcks == 0) - { + def produceResponseCallback(delayTimeMs: Int) { + if (produceRequest.requiredAcks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) - { + if (errorInResponse) { info( "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( produceRequest.correlationId, produceRequest.clientId)) requestChannel.closeConnection(request.processor, request) - } - else - { + } else { requestChannel.noOperation(request.processor, request) } - } - else - { + } else { val response = ProducerResponse(produceRequest.correlationId, responseStatus, produceRequest.versionId, - delayTime) + delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } - - quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, - numBytesAppended, - produceResponseCallback) } + + quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, + numBytesAppended, + produceResponseCallback) } // only allow appending to internal topic partitions @@ -337,7 +328,7 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData], delayTime : Int) { + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here @@ -351,24 +342,20 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } - val rr = FetchResponse(fetchRequest.correlationId, responsePartitionData) - def fetchResponseCallback { - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, rr))) + def fetchResponseCallback(delayTimeMs: Int) { + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } // Do not throttle replication traffic if (fetchRequest.isFromFollower) { - fetchResponseCallback + fetchResponseCallback(0) } else { - quotaManagers.get(RequestKeys.FetchKey) match { - case Some(quotaManager) => - quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, rr.sizeInBytes, fetchResponseCallback) - case None => - warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey))) - } + quotaManagers(RequestKeys.FetchKey).recordAndMaybeThrottle(fetchRequest.clientId, + FetchResponse.responseSize(responsePartitionData, + fetchRequest.versionId), + fetchResponseCallback) } - val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTime) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } // call the replica manager to fetch messages from the local replica diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 4c5f4be..0e613e7 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -231,7 +231,7 @@ class OffsetManager(val config: OffsetManagerConfig, new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0426f11..c195536 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -31,7 +31,6 @@ import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.metrics.Metrics import scala.collection._ @@ -301,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) { + responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { if (isValidRequiredAcks(requiredAcks)) { val sTime = SystemTime.milliseconds @@ -331,8 +330,7 @@ class ReplicaManager(val config: KafkaConfig, } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - // Zero delay time until quotas are enforced - responseCallback(produceResponseStatus, 0) + responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -343,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } - responseCallback(responseStatus, 0) + responseCallback(responseStatus) } } @@ -440,7 +438,7 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchMinBytes: Int, fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], - responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) { + responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -465,8 +463,7 @@ class ReplicaManager(val config: KafkaConfig, if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - // Zero delay time until quotas are enforced - responseCallback(fetchPartitionData, 0) + responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala index 1f80d54..b8b70cd 100644 --- a/core/src/main/scala/kafka/server/ThrottledResponse.scala +++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala @@ -28,10 +28,10 @@ import org.apache.kafka.common.utils.Time * @param delayTimeMs delay associated with this request * @param callback Callback to trigger after delayTimeMs milliseconds */ -private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed { +private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Int, callback: Int => Unit) extends Delayed { val endTime = time.milliseconds + delayTimeMs - def execute() = callback + def execute() = callback(delayTimeMs) override def getDelay(unit: TimeUnit): Long = { unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index caf98e8..997928c 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -30,7 +30,7 @@ class ClientQuotaManagerTest { quotaBytesPerSecondOverrides = "p1=2000,p2=4000") var numCallbacks: Int = 0 - def callback { + def callback(delayTimeMs: Int) { numCallbacks += 1 } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b76b31a..3770cb4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -77,7 +77,7 @@ class ReplicaManagerTest { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime: Int) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala index c4b5803..184e482 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala @@ -32,7 +32,7 @@ class ThrottledResponseExpirationTest { Collections.emptyList(), time) - def callback { + def callback(delayTimeMs: Int) { numCallbacks += 1 } -- 1.7.12.4 From 0f873746f2defd33edab67aca885bd17d64d227f Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Fri, 21 Aug 2015 16:28:58 -0700 Subject: [PATCH 08/10] Addressing joels comments --- .../kafka/common/requests/FetchResponse.java | 64 +++++++++++++++------- .../kafka/common/requests/ProduceResponse.java | 5 +- .../kafka/common/requests/RequestResponseTest.java | 33 +++++++++++ .../scala/kafka/server/ClientQuotaManager.scala | 18 +++--- .../scala/kafka/server/ThrottledResponse.scala | 8 +-- .../server/ThrottledResponseExpirationTest.scala | 6 +- 6 files changed, 96 insertions(+), 38 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index bcb6e00..ccf1797 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -29,6 +29,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * This wrapper supports both v0 and v1 of FetchRequest. + */ public class FetchResponse extends AbstractRequestResponse { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id); @@ -43,7 +46,10 @@ public class FetchResponse extends AbstractRequestResponse { private static final String PARTITION_KEY_NAME = "partition"; private static final String ERROR_CODE_KEY_NAME = "error_code"; - /** + // Default throttle time + private static final int DEFAULT_THROTTLE_TIME = 0; + + /** * Possible error code: * * OFFSET_OUT_OF_RANGE (1) @@ -74,6 +80,17 @@ public class FetchResponse extends AbstractRequestResponse { } } + /** + * Constructor for Version 0 + * @param responseData fetched data grouped by topic-partition + */ + public FetchResponse(Map responseData) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0))); + initCommonFields(responseData); + this.responseData = responseData; + this.throttleTime = DEFAULT_THROTTLE_TIME; + } + /** * Constructor for Version 1 * @param responseData fetched data grouped by topic-partition @@ -81,7 +98,32 @@ public class FetchResponse extends AbstractRequestResponse { */ public FetchResponse(Map responseData, int throttleTime) { super(new Struct(CURRENT_SCHEMA)); + initCommonFields(responseData); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); + this.responseData = responseData; this.throttleTime = throttleTime; + } + + public FetchResponse(Struct struct) { + super(struct); + this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); + responseData = new HashMap(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME); + ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME); + PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet); + responseData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + private void initCommonFields(Map responseData) { Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -102,28 +144,8 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); - this.responseData = responseData; } - public FetchResponse(Struct struct) { - super(struct); - this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); - responseData = new HashMap(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.getInt(PARTITION_KEY_NAME); - short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); - long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME); - ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME); - PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet); - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } - } public Map responseData() { return responseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 60847b1..2500e3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -25,6 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * This wrapper supports both v0 and v1 of ProduceRequest. + */ public class ProduceResponse extends AbstractRequestResponse { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id); @@ -58,7 +61,7 @@ public class ProduceResponse extends AbstractRequestResponse { * @param responses Produced data grouped by topic-partition */ public ProduceResponse(Map responses) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, 0))); + super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0))); initCommonFields(responses); this.responses = responses; this.throttleTime = DEFAULT_THROTTLE_TIME; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0645bc8..9e92da6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -18,7 +18,9 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; import org.junit.Test; import java.lang.reflect.Method; @@ -77,6 +79,37 @@ public class RequestResponseTest { } } + @Test + public void produceResponseVersionTest() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); + + ProduceResponse v0Response = new ProduceResponse(responseData); + ProduceResponse v1Response = new ProduceResponse(responseData, 10); + assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime()); + assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime()); + assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0), v0Response.toStruct().schema()); + assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1), v1Response.toStruct().schema()); + assertEquals("Response data does not match", responseData, v0Response.responses()); + assertEquals("Response data does not match", responseData, v1Response.responses()); + } + + @Test + public void fetchResponseVersionTest() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); + + FetchResponse v0Response = new FetchResponse(responseData); + FetchResponse v1Response = new FetchResponse(responseData, 10); + assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime()); + assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime()); + assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0), v0Response.toStruct().schema()); + assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 1), v1Response.toStruct().schema()); + assertEquals("Response data does not match", responseData, v0Response.responseData()); + assertEquals("Response data does not match", responseData, v1Response.responseData()); + } + + private AbstractRequestResponse createRequestHeader() { return new RequestHeader((short) 10, (short) 1, "", 10); } diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index de7f68d..016caaf 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -95,7 +95,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (response != null) { // Decrement the size of the delay queue delayQueueSensor.record(-1) - trace("Response throttled for: " + response.delayTimeMs + " ms") + trace("Response throttled for: " + response.throttleTimeMs + " ms") response.execute() } } @@ -112,23 +112,23 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = { val clientSensors = getOrCreateQuotaSensors(clientId) - var delayTimeMs = 0 + var throttleTimeMs = 0 try { clientSensors.quotaSensor.record(value) // trigger the callback immediately if quota is not violated - callback(delayTimeMs) + callback(0) } catch { case qve: QuotaViolationException => // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) - delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId))) - delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback)) + throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId))) + delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() - clientSensors.throttleTimeSensor.record(delayTimeMs) + clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue - logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs)) + logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } - delayTimeMs.toInt + throttleTimeMs } /* @@ -139,7 +139,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * we need to add a delay of X to W such that O * W / (W + X) = T. * Solving for X, we get X = (O - T)/T * W. */ - private def delayTime(metricValue: Double, config: MetricConfig): Int = { + private def throttleTime(metricValue: Double, config: MetricConfig): Int = { val quota = config.quota() val difference = metricValue - quota.bound val time = difference / quota.bound * config.timeWindowMs() * config.samples() diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala index b8b70cd..214fa1f 100644 --- a/core/src/main/scala/kafka/server/ThrottledResponse.scala +++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala @@ -25,13 +25,13 @@ import org.apache.kafka.common.utils.Time /** * Represents a request whose response has been delayed. * @param time @Time instance to use - * @param delayTimeMs delay associated with this request + * @param throttleTimeMs delay associated with this request * @param callback Callback to trigger after delayTimeMs milliseconds */ -private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Int, callback: Int => Unit) extends Delayed { - val endTime = time.milliseconds + delayTimeMs +private[server] class ThrottledResponse(val time: Time, val throttleTimeMs: Int, callback: Int => Unit) extends Delayed { + val endTime = time.milliseconds + throttleTimeMs - def execute() = callback(delayTimeMs) + def execute() = callback(throttleTimeMs) override def getDelay(unit: TimeUnit): Long = { unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala index 184e482..778f3f8 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala @@ -75,9 +75,9 @@ class ThrottledResponseExpirationTest { val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback) val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback) val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback) - Assert.assertEquals(10, t1.delayTimeMs) - Assert.assertEquals(20, t2.delayTimeMs) - Assert.assertEquals(20, t3.delayTimeMs) + Assert.assertEquals(10, t1.throttleTimeMs) + Assert.assertEquals(20, t2.throttleTimeMs) + Assert.assertEquals(20, t3.throttleTimeMs) for(itr <- 0 to 2) { Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS)) -- 1.7.12.4 From c08570fab6a229e628588c0e3b58e0a996d2ef63 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 24 Aug 2015 10:32:58 -0700 Subject: [PATCH 09/10] Addressing joels comments --- .../main/java/org/apache/kafka/common/requests/FetchResponse.java | 2 +- .../main/java/org/apache/kafka/common/requests/ProduceResponse.java | 2 +- core/src/main/scala/kafka/api/FetchResponse.scala | 5 ++--- core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index ccf1797..343be67 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; /** - * This wrapper supports both v0 and v1 of FetchRequest. + * This wrapper supports both v0 and v1 of FetchResponse. */ public class FetchResponse extends AbstractRequestResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 2500e3b..2868550 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; /** - * This wrapper supports both v0 and v1 of ProduceRequest. + * This wrapper supports both v0 and v1 of ProduceResponse. */ public class ProduceResponse extends AbstractRequestResponse { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index b071c8e..c68db87 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -176,9 +176,8 @@ object FetchResponse { } // Returns the size of entire fetch response in bytes (including the header size) - def responseSize(data: Map[TopicAndPartition, FetchResponsePartitionData], + def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, FetchResponsePartitionData]], requestVersion: Int): Int = { - val dataGroupedByTopic = data.groupBy(_._1.topic) headerSize(requestVersion) + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { @@ -200,7 +199,7 @@ case class FetchResponse(correlationId: Int, */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) val headerSizeInBytes = FetchResponse.headerSize(requestVersion) - val sizeInBytes = FetchResponse.responseSize(data, requestVersion) + lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, requestVersion) /* * Writes the header of the FetchResponse to the input buffer diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c838ca7..e727a6f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -352,7 +352,8 @@ class KafkaApis(val requestChannel: RequestChannel, fetchResponseCallback(0) } else { quotaManagers(RequestKeys.FetchKey).recordAndMaybeThrottle(fetchRequest.clientId, - FetchResponse.responseSize(responsePartitionData, + FetchResponse.responseSize(responsePartitionData + .groupBy(_._1.topic), fetchRequest.versionId), fetchResponseCallback) } -- 1.7.12.4 From 01dc9d85dd3401d937d3ab63b530e02a280a954b Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 25 Aug 2015 11:29:18 -0700 Subject: [PATCH 10/10] Addressed Joels comments --- .../main/java/org/apache/kafka/common/requests/FetchResponse.java | 6 +++++- core/src/main/scala/kafka/api/FetchResponse.scala | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 343be67..7b78415 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -106,7 +106,6 @@ public class FetchResponse extends AbstractRequestResponse { public FetchResponse(Struct struct) { super(struct); - this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -121,6 +120,7 @@ public class FetchResponse extends AbstractRequestResponse { responseData.put(new TopicPartition(topic, partition), partitionData); } } + this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; } private void initCommonFields(Map responseData) { @@ -158,4 +158,8 @@ public class FetchResponse extends AbstractRequestResponse { public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); } + + public static FetchResponse parse(ByteBuffer buffer, int version) { + return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer)); + } } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index c68db87..b0896cf 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -179,12 +179,12 @@ object FetchResponse { def responseSize(dataGroupedByTopic: Map[String, Map[TopicAndPartition, FetchResponsePartitionData]], requestVersion: Int): Int = { headerSize(requestVersion) + - dataGroupedByTopic.foldLeft(0) ((folded, curr) => { - val topicData = TopicData(curr._1, curr._2.map { + dataGroupedByTopic.foldLeft(0) {case (folded, (topic, partitionDataMap)) => { + val topicData = TopicData(topic, partitionDataMap.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) }) folded + topicData.sizeInBytes - }) + }} } } @@ -197,7 +197,7 @@ case class FetchResponse(correlationId: Int, /** * Partitions the data into a map of maps (one for each topic). */ - lazy val dataGroupedByTopic = data.groupBy(_._1.topic) + lazy val dataGroupedByTopic = data.groupBy{ case (topicAndPartition, fetchData) => topicAndPartition.topic } val headerSizeInBytes = FetchResponse.headerSize(requestVersion) lazy val sizeInBytes = FetchResponse.responseSize(dataGroupedByTopic, requestVersion) -- 1.7.12.4