From b91ef7ddf709ac5cf579e63180d1abafb740e07d Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Sun, 12 Apr 2015 20:38:23 -0700 Subject: [PATCH 1/2] Modify metrics to use new ApiKeys --- .../src/main/java/org/apache/kafka/common/protocol/ApiKeys.java | 2 ++ .../src/main/java/org/apache/kafka/common/protocol/Protocol.java | 4 ++++ core/src/main/scala/kafka/network/RequestChannel.scala | 7 ++++--- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 07aba71..7870c4c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -26,6 +26,8 @@ public enum ApiKeys { METADATA(3, "metadata"), LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), + UPDATE_METADATA_KEY(6, "update_metadata"), + CONTROLLED_SHUTDOWN_KEY(7, "controlled_shutdown"), OFFSET_COMMIT(8, "offset_commit"), OFFSET_FETCH(9, "offset_fetch"), CONSUMER_METADATA(10, "consumer_metadata"), diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 9c4518e..d53fe45 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -404,6 +404,8 @@ public class Protocol { REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; @@ -416,6 +418,8 @@ public class Protocol { RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d9c57b..9d7b2ef 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,7 +26,8 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.network.Send +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger @@ -82,7 +83,7 @@ object RequestChannel extends Logging { val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L) val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L) val totalTime = endTimeMs - startTimeMs - var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) + var metricsList = List(RequestMetrics.metricsMap(ApiKeys.forId(requestId).name)) if (requestId == RequestKeys.FetchKey) { val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower metricsList ::= ( if (isFromFollower) @@ -207,7 +208,7 @@ object RequestMetrics { val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer" val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower" - (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) + (ApiKeys.values().toList.map(e => e.name) ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) } -- 1.9.5 (Apple Git-50.3) From d54bf1ef85bc2a98241222cbdec6aed71fc6418c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Sun, 12 Apr 2015 21:27:49 -0700 Subject: [PATCH 2/2] fixed ApiKey names and removed needless import --- .../org/apache/kafka/common/protocol/ApiKeys.java | 26 +++++++++++----------- .../main/scala/kafka/network/RequestChannel.scala | 1 - 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 7870c4c..b39e9bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -20,19 +20,19 @@ package org.apache.kafka.common.protocol; * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "produce"), - FETCH(1, "fetch"), - LIST_OFFSETS(2, "list_offsets"), - METADATA(3, "metadata"), - LEADER_AND_ISR(4, "leader_and_isr"), - STOP_REPLICA(5, "stop_replica"), - UPDATE_METADATA_KEY(6, "update_metadata"), - CONTROLLED_SHUTDOWN_KEY(7, "controlled_shutdown"), - OFFSET_COMMIT(8, "offset_commit"), - OFFSET_FETCH(9, "offset_fetch"), - CONSUMER_METADATA(10, "consumer_metadata"), - JOIN_GROUP(11, "join_group"), - HEARTBEAT(12, "heartbeat"); + PRODUCE(0, "Produce"), + FETCH(1, "Fetch"), + LIST_OFFSETS(2, "Offsets"), + METADATA(3, "Metadata"), + LEADER_AND_ISR(4, "LeaderAndIsr"), + STOP_REPLICA(5, "StopReplica"), + UPDATE_METADATA_KEY(6, "UpdateMetadata"), + CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), + OFFSET_COMMIT(8, "OffsetCommit"), + OFFSET_FETCH(9, "OffsetFetch"), + CONSUMER_METADATA(10, "ConsumerMetadata"), + JOIN_GROUP(11, "JoinGroup"), + HEARTBEAT(12, "Heartbeat"); private static ApiKeys[] codeToType; public static final int MAX_API_KEY; diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9d7b2ef..1d0024c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,7 +26,6 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger -- 1.9.5 (Apple Git-50.3)