From b47bbea8ec4b5da1e01fbfd9edd4fcafa51f7f74 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 11 May 2015 14:50:19 -0700 Subject: [PATCH] Fixing bug --- .../kafka/clients/consumer/internals/Fetcher.java | 15 ++++++- .../kafka/clients/producer/internals/Sender.java | 15 +++++++ .../org/apache/kafka/common/protocol/Protocol.java | 34 +++++++++++++--- .../apache/kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 8 +++- .../kafka/common/requests/ProduceRequest.java | 4 +- .../kafka/common/requests/ProduceResponse.java | 8 +++- .../clients/consumer/internals/FetcherTest.java | 46 ++++++++++++++++++---- .../clients/producer/internals/SenderTest.java | 37 +++++++++++++++-- .../kafka/common/requests/RequestResponseTest.java | 4 +- core/src/main/scala/kafka/api/FetchRequest.scala | 9 ++++- core/src/main/scala/kafka/api/FetchResponse.scala | 26 +++++++++--- .../src/main/scala/kafka/api/ProducerRequest.scala | 2 +- .../main/scala/kafka/api/ProducerResponse.scala | 17 ++++++-- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../scala/kafka/server/AbstractFetcherThread.scala | 16 ++++++-- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 ++- .../main/scala/kafka/server/DelayedProduce.scala | 5 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++-- .../main/scala/kafka/server/OffsetManager.scala | 2 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 13 +++--- .../api/RequestResponseSerializationTest.scala | 40 ++++++++++++++++++- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- 25 files changed, 277 insertions(+), 61 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ef9dd52..321b9de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -63,7 +63,7 @@ public class Fetcher { private static final Logger log = LoggerFactory.getLogger(Fetcher.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; - + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; private final KafkaClient client; @@ -337,6 +337,7 @@ public class Fetcher { this.sensors.recordsFetched.record(totalCount); } this.sensors.fetchLatency.record(resp.requestLatencyMs()); + this.sensors.quotaDelayTimeSensor.record(resp.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } /** @@ -390,6 +391,7 @@ public class Fetcher { public final Sensor recordsFetched; public final Sensor fetchLatency; public final Sensor recordsFetchLag; + public final Sensor quotaDelayTimeSensor; public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { @@ -439,6 +441,17 @@ public class Fetcher { this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max()); + + this.quotaDelayTimeSensor = metrics.sensor("fetch-throttle-time"); + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-avg", + this.metricGrpName, + "The average throttle time in ms", + tags), new Avg()); + + this.quotaDelayTimeSensor.add(new MetricName("throttle-time-max", + this.metricGrpName, + "The maximum throttle time in ms", + tags), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b2db91c..8b84c28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; /* the state of each nodes connection */ private final KafkaClient client; @@ -238,6 +239,8 @@ public class Sender implements Runnable { completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + this.sensors.recordQuotaDelay(response.request().request().destination(), + response.responseBody().getInt(QUOTA_DELAY_KEY_NAME)); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) @@ -337,6 +340,7 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; + public final Sensor quotaDelayTimeSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -366,6 +370,12 @@ public class Sender implements Runnable { m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); this.requestTimeSensor.add(m, new Max()); + this.quotaDelayTimeSensor = metrics.sensor("produce-throttle-time"); + m = new MetricName("throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Avg()); + m = new MetricName("throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + this.quotaDelayTimeSensor.add(m, new Max()); + this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); this.recordsPerRequestSensor.add(m, new Rate()); @@ -500,6 +510,11 @@ public class Sender implements Runnable { nodeRequestTime.record(latency, now); } } + + public void recordQuotaDelay(int node, long delayTimeMs) { + this.quotaDelayTimeSensor.record(delayTimeMs, time.milliseconds()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 3dc8b01..461388e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -107,9 +107,23 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - - public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; - public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; + + public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16), + new Field("base_offset", + INT64))))))), + new Field("throttle_time_ms", INT32, "Amount of time in milliseconds the request was throttled if at all", 0)); + + public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1}; + public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -342,6 +356,9 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); + // The V1 Fetch Request body is the same as V0. + // Only the version number is incremented to indicate a newer client + public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0; public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -357,9 +374,14 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - - public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0}; - public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0}; + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, + "Amount of time in milliseconds the request was throttled if at all", + 0), + new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; + public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; /* Consumer metadata api */ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 8686d83..aaec66d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -130,7 +130,7 @@ public class FetchRequest extends AbstractRequest { responseData.put(entry.getKey(), partitionResponse); } - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } public int replicaId() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index eb8951f..005ec08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -37,6 +37,7 @@ public class FetchResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -59,6 +60,7 @@ public class FetchResponse extends AbstractRequestResponse { public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); private final Map responseData; + private final int delayTime; public static final class PartitionData { public final short errorCode; @@ -72,8 +74,9 @@ public class FetchResponse extends AbstractRequestResponse { } } - public FetchResponse(Map responseData) { + public FetchResponse(Map responseData, int delayTime) { super(new Struct(CURRENT_SCHEMA)); + this.delayTime = delayTime; Map> topicsData = CollectionUtils.groupDataByTopic(responseData); List topicArray = new ArrayList(); @@ -94,11 +97,13 @@ public class FetchResponse extends AbstractRequestResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responseData = responseData; } public FetchResponse(Struct struct) { super(struct); + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); responseData = new HashMap(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -118,6 +123,7 @@ public class FetchResponse extends AbstractRequestResponse { public Map responseData() { return responseData; } + public int getDelayTime() { return this.delayTime; } public static FetchResponse parse(ByteBuffer buffer) { return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index fabeae3..ea69c6f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -27,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequest { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -102,7 +102,7 @@ public class ProduceRequest extends AbstractRequest { responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); } - return new ProduceResponse(responseMap); + return new ProduceResponse(responseMap, 0); } public short acks() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 37ec0b7..46fd87d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -33,6 +33,7 @@ public class ProduceResponse extends AbstractRequestResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; + private static final String QUOTA_DELAY_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; @@ -49,8 +50,9 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; + private final int delayTime; - public ProduceResponse(Map responses) { + public ProduceResponse(Map responses, int delayTime) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); List topicDatas = new ArrayList(responseByTopic.size()); @@ -70,7 +72,9 @@ public class ProduceResponse extends AbstractRequestResponse { topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + struct.set(QUOTA_DELAY_KEY_NAME, delayTime); this.responses = responses; + this.delayTime = delayTime; } public ProduceResponse(Struct struct) { @@ -88,11 +92,13 @@ public class ProduceResponse extends AbstractRequestResponse { responses.put(tp, new PartitionResponse(errorCode, offset)); } } + this.delayTime = struct.getInt(QUOTA_DELAY_KEY_NAME); } public Map responses() { return this.responses; } + public int getDelayTime() { return this.delayTime; } public static final class PartitionResponse { public short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4195410..faefe0c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,8 +22,10 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -102,7 +104,7 @@ public class FetcherTest { // normal fetch fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); client.poll(0, time.milliseconds()); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); @@ -124,14 +126,14 @@ public class FetcherTest { // fetch with not leader fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); // fetch with unknown topic partition fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -139,7 +141,7 @@ public class FetcherTest { // fetch with out of range subscriptions.fetched(tp, 5); fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -156,7 +158,7 @@ public class FetcherTest { // fetch with out of range fetcher.initFetches(cluster, time.milliseconds()); - client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L)); + client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code())); client.poll(0, time.milliseconds()); assertEquals(0, fetcher.fetchedRecords().size()); @@ -164,8 +166,37 @@ public class FetcherTest { assertEquals(0L, (long) subscriptions.consumed(tp)); } - private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { - FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + subscriptions.subscribe(tp); + subscriptions.fetched(tp, 0); + subscriptions.consumed(tp, 0); + for (int i = 1; i <= 3; i++) { + // normal fetch + fetcher.initFetches(cluster, time.milliseconds()); + client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); + client.poll(0, time.milliseconds()); + } + + Map allMetrics = metrics.metrics(); + long avg = 0; + long max = 0; + for (MetricName m : allMetrics.keySet()) { + if (m.name().equals("throttle-time-avg")) { + avg = (long) allMetrics.get(m).value(); + } else if (m.name().equals("throttle-time-max")) { + max = (long) allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + + private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int delayTime) { + FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), delayTime); return response.toStruct(); } @@ -173,5 +204,4 @@ public class FetcherTest { ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets))); return response.toStruct(); } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..2cc9e50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,7 +26,9 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -76,7 +78,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -84,6 +86,33 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + /* + * Send multiple request. Verify that the client side quota metrics have the right values + */ + @Test + public void testQuotaMetrics() throws Exception { + final long offset = 0; + for (int i = 1; i <= 3; i++) { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); + sender.run(time.milliseconds()); + } + long avg = 0; + long max = 0; + + Map allMetrics = metrics.metrics(); + for (MetricName m : allMetrics.keySet()) { + if (m.name().equals("throttle-time-avg")) { + avg = (long) allMetrics.get(m).value(); + } else if (m.name().equals("throttle-time-max")) { + max = (long) allMetrics.get(m).value(); + } + } + assertEquals(200, avg); + assertEquals(300, max); + } + @Test public void testRetries() throws Exception { // create a sender with retries = 1 @@ -110,7 +139,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); long offset = 0; - client.respond(produceResponse(tp, offset, Errors.NONE.code())); + client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -138,10 +167,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error) { + private Struct produceResponse(TopicPartition tp, long offset, int error, int delayTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset); Map partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp); + ProduceResponse response = new ProduceResponse(partResp, delayTimeMs); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e3cc196..7b1c9ea 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -103,7 +103,7 @@ public class RequestResponseTest { private AbstractRequestResponse createFetchResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); - return new FetchResponse(responseData); + return new FetchResponse(responseData, 0); } private AbstractRequest createHeartBeatRequest() { @@ -179,6 +179,6 @@ public class RequestResponseTest { private AbstractRequestResponse createProduceResponse() { Map responseData = new HashMap(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); - return new ProduceResponse(responseData); + return new ProduceResponse(responseData, 0); } } diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b038c15..e90c2d0 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -31,7 +31,7 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 val DefaultCorrelationId = 0 @@ -170,7 +170,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @nonthreadsafe class FetchRequestBuilder() { private val correlationId = new AtomicInteger(0) - private val versionId = FetchRequest.CurrentVersion + private var versionId = FetchRequest.CurrentVersion private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -205,6 +205,11 @@ class FetchRequestBuilder() { this } + def requestVersion(versionId: Short): FetchRequestBuilder = { + this.versionId = versionId + this + } + def build() = { val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) requestMap.clear() diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 75aaf57..dc4b1af 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -138,8 +138,12 @@ object FetchResponse { 4 + /* correlationId */ 4 /* topic count */ - def readFrom(buffer: ByteBuffer): FetchResponse = { + + // The request version is used to determine which fields we can expect in the response + def readFrom(buffer: ByteBuffer, requestVersion: Int): FetchResponse = { val correlationId = buffer.getInt + val delayTime = if (requestVersion > 0) buffer.getInt else 0 + val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topicData = TopicData.readFrom(buffer) @@ -148,20 +152,23 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(correlationId, Map(pairs:_*)) + FetchResponse(correlationId, Map(pairs:_*), requestVersion, delayTime) } } -case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) +case class FetchResponse(correlationId: Int, + data: Map[TopicAndPartition, FetchResponsePartitionData], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** * Partitions the data into a map of maps (one for each topic). */ lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - + val delayTimeSize = if(requestVersion > 0) 4 else 0 val sizeInBytes = - FetchResponse.headerSize + + FetchResponse.headerSize + delayTimeSize + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { val topicData = TopicData(curr._1, curr._2.map { case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData) @@ -209,10 +216,17 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { override def complete = sent >= sendSize - private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) + // The delayTimeSize will be 0 if the request was made from a client sending a V0 style request + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize + fetchResponse.delayTimeSize) buffer.putInt(size) buffer.putInt(fetchResponse.correlationId) + // Include the delayTime only if the client can read it + if(fetchResponse.requestVersion > 0) { + buffer.putInt(fetchResponse.delayTime) + } + buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count + buffer.rewind() val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 570b2da..edc0bb6 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -25,7 +25,7 @@ import kafka.network.RequestChannel.Response import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5d1fac4..0f40a65 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -37,13 +37,17 @@ object ProducerResponse { }) }) - ProducerResponse(correlationId, Map(statusPairs:_*)) + val delayTime = buffer.getInt + ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, delayTime) } } case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus], + requestVersion : Int = 0, + delayTime : Int = 0) extends RequestOrResponse() { /** @@ -54,6 +58,7 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P def hasError = status.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { + val delayTimeSize = if(requestVersion > 0) 4 else 0 val groupedStatus = statusGroupedByTopic 4 + /* correlation id */ 4 + /* topic count */ @@ -66,7 +71,9 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P 2 + /* error code */ 8 /* offset */ } - }) + }) + + delayTimeSize + } def writeTo(buffer: ByteBuffer) { @@ -85,6 +92,10 @@ case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, P buffer.putLong(nextOffset) } }) + // Delay time is only supported on V1 style requests + if(requestVersion > 0) { + buffer.putInt(delayTime) + } } override def describe(details: Boolean):String = { toString } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 31a2639..15c7037 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -117,7 +117,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchResponse = FetchResponse.readFrom(response.buffer, request.versionId) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a439046..8096800 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -37,8 +37,17 @@ import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, +abstract class AbstractFetcherThread(name: String, + clientId: String, + sourceBroker: BrokerEndPoint, + socketTimeout: Int, + socketBufferSize: Int, + fetchSize: Int, + fetcherBrokerId: Int = -1, + maxWait: Int = 0, + minBytes: Int = 1, + fetchBackOffMs: Int = 0, + fetchRequestVersion: Short = FetchRequest.CurrentVersion, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map @@ -52,7 +61,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). - minBytes(minBytes) + minBytes(minBytes). + requestVersion(fetchRequestVersion) /* callbacks to be defined in subclass */ diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index de6cf5b..8e5cb39 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -55,7 +55,7 @@ case class FetchMetadata(fetchMinBytes: Int, class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) extends DelayedOperation(delayMs) { /** @@ -131,7 +131,8 @@ class DelayedFetch(delayMs: Long, val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 05078b2..7a9c30b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -53,7 +53,7 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code @@ -126,7 +126,8 @@ class DelayedProduce(delayMs: Long, */ override def onComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) - responseCallback(responseStatus) + // Zero delay time until quotas are enforced + responseCallback(responseStatus, 0) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 417960d..abd4915 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -234,7 +234,7 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -259,7 +259,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.noOperation(request.processor, request) } } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) + val response = ProducerResponse(produceRequest.correlationId, responseStatus, produceRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } } @@ -289,7 +289,7 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData], delayTime : Int) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here @@ -304,7 +304,7 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } - val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTime) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 18680ce..d47c78a 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -245,7 +245,7 @@ class OffsetManager(val config: OffsetManagerConfig, new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b31b432..f6f9378 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{OffsetRequest, FetchResponsePartitionData} +import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, @@ -38,6 +38,8 @@ class ReplicaFetcherThread(name:String, maxWait = brokerConfig.replicaFetchWaitMaxMs, minBytes = brokerConfig.replicaFetchMinBytes, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, + fetchRequestVersion = + if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0, isInterruptible = false) { // process fetched data diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..ba2e1aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -278,10 +278,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + responseCallback: (Map[TopicAndPartition, ProducerResponseStatus], Int) => Unit) { if (isValidRequiredAcks(requiredAcks)) { - val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -309,7 +308,8 @@ class ReplicaManager(val config: KafkaConfig, } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) + // Zero delay time until quotas are enforced + responseCallback(produceResponseStatus, 0) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -320,7 +320,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } - responseCallback(responseStatus) + responseCallback(responseStatus, 0) } } @@ -417,7 +417,7 @@ class ReplicaManager(val config: KafkaConfig, replicaId: Int, fetchMinBytes: Int, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], - responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + responseCallback: (Map[TopicAndPartition, FetchResponsePartitionData], Int) => Unit) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId @@ -443,7 +443,8 @@ class ReplicaManager(val config: KafkaConfig, if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) - responseCallback(fetchPartitionData) + // Zero delay time until quotas are enforced + responseCallback(fetchPartitionData, 0) } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 5717165..3251879 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -18,9 +18,12 @@ package kafka.api +import java.nio.channels.GatheringByteChannel + import kafka.cluster.{BrokerEndPoint, EndPoint, Broker} import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.common._ +import kafka.consumer.FetchRequestAndResponseStatsRegistry import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.SystemTime @@ -150,7 +153,7 @@ object SerializationTestUtils { ProducerResponse(1, Map( TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) - )) + ), ProducerRequest.CurrentVersion, 100) def createTestFetchRequest: FetchRequest = { new FetchRequest(requestInfo = requestInfos) @@ -304,4 +307,39 @@ class RequestResponseSerializationTest extends JUnitSuite { assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized) } } + + @Test + def testProduceResponseVersion() { + val oldClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + )) + + val newClientResponse = ProducerResponse(1, Map( + TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + + val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes) + newClientResponse.writeTo(buffer) + buffer.rewind() + assertEquals(ProducerResponse.readFrom(buffer).delayTime, 100) + } + + @Test + def testFetchResponseVersion() { + val oldClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 0) + + val newClientResponse = FetchResponse(1, Map( + TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + ), 1, 100) + + // new response should have 4 bytes more than the old response since delayTime is an INT32 + assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes) + } } diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index f3ab3f4..00764cc 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -27,7 +27,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", purgeInterval = 1) } override def tearDown() { @@ -68,6 +68,18 @@ class DelayedOperationTest extends JUnit3Suite { } @Test + def testTimeoutNoKeys() { + val expiration = 20L + val r1 = new MockDelayedOperation(expiration) + val start = System.currentTimeMillis + purgatory.tryCompleteElseWatch(r1, Seq("a1")) + r1.awaitExpiration() + val elapsed = System.currentTimeMillis - start + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + @Test def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 00d5933..09c5647 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -78,7 +78,7 @@ class ReplicaManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus], delayTime : Int) = { assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } -- 1.7.12.4