diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..ebaa476 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,8 +30,11 @@ public enum ApiKeys { METADATA(3, "metadata"), LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), - OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + OFFSET_COMMIT(8, "offset_commit"), + OFFSET_FETCH(9, "offset_fetch"), + CONSUMER_METADATA(10, "consumer_metadata"), + JOIN_GROUP(11, "join_group"), + HEART_BEAT(12, "heart_beat"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..72ca653 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -104,6 +104,176 @@ public class Protocol { public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; + /* Consumer api */ + public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), + new Field("timestamp", + INT64, + "Not used."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), + "Partitions to commit offsets.")); + + public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", + INT32, + "The consumer group id."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + "Topics to commit offsets.")); + + public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id", + INT32, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + "Topics to commit offsets.")); + + public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", + INT16)); + + public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0))); + + public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); + + /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */ + public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V1 }; + public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0 }; + + public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("fetch_offset", + INT64, + "Message offset."), + new Field("max_bytes", + INT32, + "Maximum bytes to fetch.")); + + public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to fetch."), + new Field("partitions", + new ArrayOf(FETCH_REQUEST_PARTITION_V0), + "Partitions to fetch.")); + + public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id", + INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("max_wait_time", + INT32, + "Maximum time in ms to wait for the response."), + new Field("min_bytes", + INT32, + "Minimum bytes to accumulate in the response."), + new Field("topics", + new ArrayOf(FETCH_REQUEST_TOPIC_V0), + "Topics to fetch.")); + + public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", + INT16), + new Field("high_watermark", + INT64, + "Last committed offset."), + new Field("record_set", BYTES)); + + public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(FETCH_RESPONSE_PARTITION_V0))); + + public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 }; + public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 }; + + public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id.")); + + public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", + INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); + + public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 }; + public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 }; + + public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("session_timeout", + INT32, + "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."), + new Field("topics", + new ArrayOf(STRING), + "An array of topics to subscribe to."), + new Field("consumer_id", + STRING, + "The assigned consumer id or an empty string for a new consumer."), + new Field("partition_assignment_strategy", + STRING, + "The strategy for the coordinator to assign partitions.")); + + public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partitions", new ArrayOf(INT32))); + public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", + INT16), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("assigned_partitions", + new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0))); + + public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 }; + public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 }; + + + public static Schema HEART_BEAT_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator.")); + + public static Schema HEART_BEAT_RESPONSE_V0 = new Schema(new Field("error_code", + INT16)); + + public static Schema[] HEART_BEAT_REQUEST = new Schema[] { HEART_BEAT_REQUEST_V0 }; + public static Schema[] HEART_BEAT_RESPONSE = new Schema[] { HEART_BEAT_RESPONSE_V0 }; + /* an array of all requests and responses with all schema versions */ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -113,22 +283,28 @@ public class Protocol { static { REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; - REQUESTS[ApiKeys.FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; + REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; + REQUESTS[ApiKeys.HEART_BEAT.id] = HEART_BEAT_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; - RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; + RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; + RESPONSES[ApiKeys.HEART_BEAT.id] = HEART_BEAT_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 8cecba5..91d281e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -107,6 +107,22 @@ public class Struct { return (Integer) get(name); } + public Long getLong(Field field) { + return (Long) get(field); + } + + public Long getLong(String name) { + return (Long) get(name); + } + + public ByteBuffer getBytes(Field field) { + return (ByteBuffer) get(field); + } + + public ByteBuffer getBytes(String name) { + return (ByteBuffer) get(name); + } + public Object[] getArray(Field field) { return (Object[]) get(field); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java new file mode 100644 index 0000000..726fc05 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ConsumerMetadataRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id); + private static Field GROUP_ID_KEY_FIELD = curSchema.get("group_id"); + + public ConsumerMetadataRequest(String groupId) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_FIELD, groupId); + } + + public ConsumerMetadataRequest(Struct struct) { + super(struct); + } + + public String groupId() { + return struct.getString(GROUP_ID_KEY_FIELD); + } + + public static ConsumerMetadataRequest parse(ByteBuffer buffer) { + return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java new file mode 100644 index 0000000..b7a6c7c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ConsumerMetadataResponse extends GenericStruct { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id); + private static Field ERROR_CODE_KEY_FIELD = curSchema.get("error_code"); + private static Field COORDINATOR_KEY_FIELD = curSchema.get("coordinator"); + private static Schema curBrokerSchema = Protocol.BROKER; + private static Field NODE_ID_KEY_FIELD = curBrokerSchema.get("node_id"); + private static Field HOST_KEY_FIELD = curBrokerSchema.get("host"); + private static Field PORT_KEY_FIELD = curBrokerSchema.get("port"); + + private final Node node; + + public ConsumerMetadataResponse(short errorCode, Node node) { + super(new Struct(curSchema)); + struct.set(ERROR_CODE_KEY_FIELD, errorCode); + Struct broker = struct.instance(COORDINATOR_KEY_FIELD); + broker.set(NODE_ID_KEY_FIELD, node.id()); + broker.set(HOST_KEY_FIELD, node.host()); + broker.set(PORT_KEY_FIELD, node.port()); + this.node = node; + } + + public ConsumerMetadataResponse(Struct struct) { + super(struct); + Struct broker = (Struct) struct.get(COORDINATOR_KEY_FIELD); + int nodeId = broker.getInt(NODE_ID_KEY_FIELD); + String host = broker.getString(HOST_KEY_FIELD); + int port = broker.getInt(PORT_KEY_FIELD); + node = new Node(nodeId, host, port); + } + + public short errorCode() { + return (Short) struct.get(ERROR_CODE_KEY_FIELD); + } + + public Node node() { + return node; + } + + public static ConsumerMetadataResponse parse(ByteBuffer buffer) { + return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java new file mode 100644 index 0000000..60880f0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id); + private static Field REPLICA_ID_KEY_FIELD = curSchema.get("replica_id"); + private static Field MAX_WAIT_KEY_FIELD = curSchema.get("max_wait_time"); + private static Field MIN_BYTES_KEY_FIELD = curSchema.get("min_bytes"); + private static Field TOPICS_KEY_FIELD = curSchema.get("topics"); + + public static Schema topicSchema = Protocol.FETCH_REQUEST_TOPIC_V0; + private static Field TOPIC_KEY_FIELD = topicSchema.get("topic"); + private static Field PARTITIONS_KEY_FIELD = topicSchema.get("partitions"); + + public static Schema partitionSchema = Protocol.FETCH_REQUEST_PARTITION_V0; + private static Field PARTITION_KEY_FIELD = partitionSchema.get("partition"); + private static Field FETCH_OFFSET_KEY_FIELD = partitionSchema.get("fetch_offset"); + private static Field MAX_BYTES_KEY_FIELD = partitionSchema.get("max_bytes"); + + public final Map> topicsData; + + public static class PartitionData { + public final int partition; + public final long offset; + public final int maxBytes; + + public PartitionData(int partition, long offset, int maxBytes) { + this.partition = partition; + this.offset = offset; + this.maxBytes = maxBytes; + } + } + + public FetchRequest(int replicaId, int maxWait, int minBytes, Map> topicsData) { + super(new Struct(curSchema)); + struct.set(REPLICA_ID_KEY_FIELD, replicaId); + struct.set(MAX_WAIT_KEY_FIELD, maxWait); + struct.set(MIN_BYTES_KEY_FIELD, minBytes); + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_FIELD); + topicData.set(TOPIC_KEY_FIELD, entries.getKey()); + List partitionArray = new ArrayList(); + for (PartitionData fetchPartitionData : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_FIELD); + partitionData.set(PARTITION_KEY_FIELD, fetchPartitionData.partition); + partitionData.set(FETCH_OFFSET_KEY_FIELD, fetchPartitionData.offset); + partitionData.set(MAX_BYTES_KEY_FIELD, fetchPartitionData.maxBytes); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_FIELD, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_FIELD, topicArray.toArray()); + this.topicsData = topicsData; + } + + public FetchRequest(Struct struct) { + super(struct); + topicsData = new HashMap>(); + for (Struct topicResponse : (Struct[]) struct.get(TOPICS_KEY_FIELD)) { + String topic = topicResponse.getString(TOPIC_KEY_FIELD); + List partitionsPerTopic = new ArrayList(); + for (Struct partitionResponse : (Struct[]) topicResponse.get(PARTITIONS_KEY_FIELD)) { + int partition = partitionResponse.getInt(PARTITION_KEY_FIELD); + long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_FIELD); + int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_FIELD); + PartitionData partitionData = new PartitionData(partition, offset, maxBytes); + partitionsPerTopic.add(partitionData); + } + topicsData.put(topic, partitionsPerTopic); + } + } + + public int replicaId() { + return struct.getInt(REPLICA_ID_KEY_FIELD); + } + + public int maxWait() { + return struct.getInt(MAX_WAIT_KEY_FIELD); + } + + public int minBytes() { + return struct.getInt(MIN_BYTES_KEY_FIELD); + } + + public Map> topicData() { + return topicsData; + } + + public static FetchRequest parse(ByteBuffer buffer) { + return new FetchRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java new file mode 100644 index 0000000..e041e46 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchResponse extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id); + private static Field RESPONSES_KEY_FIELD = curSchema.get("responses"); + + public static Schema topicSchema = Protocol.FETCH_RESPONSE_TOPIC_V0; + private static Field TOPIC_KEY_FIELD = topicSchema.get("topic"); + private static Field PARTITIONS_KEY_FIELD = topicSchema.get("partition_responses"); + + public static Schema partitionSchema = Protocol.FETCH_RESPONSE_PARTITION_V0; + private static Field PARTITION_KEY_FIELD = partitionSchema.get("partition"); + private static Field ERROR_CODE_KEY_FIELD = partitionSchema.get("error_code"); + private static Field HIGH_WATERMARK_KEY_FIELD = partitionSchema.get("high_watermark"); + private static Field RECORD_SET_KEY_FIELD = partitionSchema.get("record_set"); + + private final Map> topicsData; + + public static class PartitionData { + public final int partition; + public final int errorCode; + public final long highWatermark; + public final ByteBuffer recordSet; + + public PartitionData(int partition, int errorCode, long highWatermark, ByteBuffer recordSet) { + this.partition = partition; + this.errorCode = errorCode; + this.highWatermark = highWatermark; + this.recordSet = recordSet; + } + } + + public FetchResponse(Map> topicsData) { + super(new Struct(curSchema)); + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_FIELD); + topicData.set(TOPIC_KEY_FIELD, entries.getKey()); + List partitionArray = new ArrayList(); + for (PartitionData fetchPartitionData : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_FIELD); + partitionData.set(PARTITION_KEY_FIELD, fetchPartitionData.partition); + partitionData.set(ERROR_CODE_KEY_FIELD, fetchPartitionData.errorCode); + partitionData.set(HIGH_WATERMARK_KEY_FIELD, fetchPartitionData.highWatermark); + partitionData.set(RECORD_SET_KEY_FIELD, fetchPartitionData.recordSet); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_FIELD, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_FIELD, topicArray.toArray()); + this.topicsData = topicsData; + } + + public FetchResponse(Struct struct) { + super(struct); + topicsData = new HashMap>(); + for (Struct topicResponse : (Struct[]) struct.get(RESPONSES_KEY_FIELD)) { + String topic = topicResponse.getString(TOPIC_KEY_FIELD); + List partitionsPerTopic = new ArrayList(); + for (Struct partitionResponse : (Struct[]) topicResponse.get(PARTITIONS_KEY_FIELD)) { + int partition = partitionResponse.getInt(PARTITION_KEY_FIELD); + int errorCode = partitionResponse.getInt(ERROR_CODE_KEY_FIELD); + long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_FIELD); + ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_FIELD); + PartitionData partitionData = new PartitionData(partition, errorCode, highWatermark, recordSet); + partitionsPerTopic.add(partitionData); + } + topicsData.put(topic, partitionsPerTopic); + } + } + + public Map> topicData() { + return topicsData; + } + + public static FetchResponse parse(ByteBuffer buffer) { + return new FetchResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java b/clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java new file mode 100644 index 0000000..3b594bb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public abstract class GenericStruct { + protected final Struct struct; + + + public GenericStruct(Struct struct) { + this.struct = struct; + } + + public Struct toStruct() { + return struct; + } + + /** + * Get the serialized size of this struct + */ + public int sizeOf() { + return struct.sizeOf(); + } + + /** + * Write this struct to a buffer + */ + public void writeTo(ByteBuffer buffer) { + struct.writeTo(buffer); + } + + @Override + public String toString() { + return struct.toString(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatRequest.java new file mode 100644 index 0000000..f84b7ad --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatRequest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class HeartBeatRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEART_BEAT.id); + private static Field GROUP_ID_KEY_FIELD = curSchema.get("group_id"); + private static Field GROUP_GENERATION_ID_KEY_FIELD = curSchema.get("group_generation_id"); + private static Field CONSUMER_ID_KEY_FIELD = curSchema.get("consumer_id"); + + public HeartBeatRequest(String groupId, int groupGenerationId, String consumerId) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_FIELD, groupId); + struct.set(GROUP_GENERATION_ID_KEY_FIELD, groupGenerationId); + struct.set(CONSUMER_ID_KEY_FIELD, consumerId); + } + + public HeartBeatRequest(Struct struct) { + super(struct); + } + + public String groupId() { + return struct.getString(GROUP_ID_KEY_FIELD); + } + + public int groupGenerationId() { + return struct.getInt(GROUP_GENERATION_ID_KEY_FIELD); + } + + public String consumerId() { + return struct.getString(CONSUMER_ID_KEY_FIELD); + } + + public static HeartBeatRequest parse(ByteBuffer buffer) { + return new HeartBeatRequest(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatResponse.java new file mode 100644 index 0000000..e080300 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartBeatResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class HeartBeatResponse extends GenericStruct { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEART_BEAT.id); + private static Field ERROR_CODE_KEY_FIELD = curSchema.get("error_code"); + + public HeartBeatResponse(short errorCode) { + super(new Struct(curSchema)); + struct.set(ERROR_CODE_KEY_FIELD, errorCode); + } + + public HeartBeatResponse(Struct struct) { + super(struct); + } + + public short errorCode() { + return struct.getShort(ERROR_CODE_KEY_FIELD); + } + + public static HeartBeatResponse parse(ByteBuffer buffer) { + return new HeartBeatResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java new file mode 100644 index 0000000..f48662e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class JoinGroupRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); + private static Field GROUP_ID_KEY_FIELD = curSchema.get("group_id"); + private static Field SESSION_TIMEOUT_KEY_FIELD = curSchema.get("session_timeout"); + private static Field TOPICS_KEY_FIELD = curSchema.get("topics"); + private static Field CONSUMER_ID_KEY_FIELD = curSchema.get("consumer_id"); + private static Field STRATEGY_KEY_FIELD = curSchema.get("partition_assignment_strategy"); + + public JoinGroupRequest(String groupId, int sessionTimeout, String[] topics, String consumerId, String strategy) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_FIELD, groupId); + struct.set(SESSION_TIMEOUT_KEY_FIELD, sessionTimeout); + struct.set(TOPICS_KEY_FIELD, topics); + struct.set(CONSUMER_ID_KEY_FIELD, consumerId); + struct.set(STRATEGY_KEY_FIELD, strategy); + } + + public JoinGroupRequest(Struct struct) { + super(struct); + } + + public String groupId() { + return struct.getString(GROUP_ID_KEY_FIELD); + } + + public int sessionTimeout() { + return struct.getInt(SESSION_TIMEOUT_KEY_FIELD); + } + + public String[] topics() { + return (String[]) struct.get(TOPICS_KEY_FIELD); + } + + public String consumerId() { + return struct.getString(CONSUMER_ID_KEY_FIELD); + } + + public String strategy() { + return struct.getString(STRATEGY_KEY_FIELD); + } + + public static JoinGroupRequest parse(ByteBuffer buffer) { + return new JoinGroupRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java new file mode 100644 index 0000000..d538539 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.*; + +public class JoinGroupResponse extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id); + private static Field ERROR_CODE_KEY_FIELD = curSchema.get("error_code"); + private static Field GENERATION_ID_KEY_FIELD = curSchema.get("group_generation_id"); + private static Field CONSUMER_ID_KEY_FIELD = curSchema.get("consumer_id"); + private static Field ASSIGNED_PARTITIONS_KEY_FIELD = curSchema.get("assigned_partitions"); + public static Schema curResponseTopicSchema = Protocol.JOIN_GROUP_RESPONSE_TOPIC_V0; + private static Field TOPIC_KEY_FIELD = curSchema.get("topic"); + private static Field PARTITIONS_KEY_FIELD = curSchema.get("partitions"); + + public static int UNKNOWN_GENERATION_ID = -1; + public static String UNKNOWN_CONSUMER_ID = ""; + + public JoinGroupResponse(short errorCode, int generationId, String consumerId, Map> assignedPartitions) { + super(new Struct(curSchema)); + struct.set(ERROR_CODE_KEY_FIELD, errorCode); + struct.set(GENERATION_ID_KEY_FIELD, generationId); + struct.set(CONSUMER_ID_KEY_FIELD, consumerId); + List topicArray = new ArrayList(); + for (Map.Entry> entries: assignedPartitions.entrySet()) { + Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_FIELD); + topicData.set(TOPIC_KEY_FIELD, entries.getKey()); + topicData.set(PARTITIONS_KEY_FIELD, entries.getValue().toArray()); + topicArray.add(topicData); + } + struct.set(ASSIGNED_PARTITIONS_KEY_FIELD, topicArray.toArray()); + } + + public JoinGroupResponse(short errorCode) { + this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.>emptyMap()); + } + + public JoinGroupResponse(Struct struct) { + super(struct); + } + + public short errorCode() { + return struct.getShort(ERROR_CODE_KEY_FIELD); + } + + public int generationId() { + return struct.getInt(GENERATION_ID_KEY_FIELD); + } + + public String consumerId() { + return struct.getString(CONSUMER_ID_KEY_FIELD); + } + + public Map> assignedPartitions() { + Struct[] assignment = (Struct[]) struct.get(ASSIGNED_PARTITIONS_KEY_FIELD); + Map> assignmentMap = new HashMap>(); + for (Struct topicData : assignment) { + String topic = topicData.getString(TOPIC_KEY_FIELD); + Integer[] partitions = (Integer[]) topicData.get(PARTITIONS_KEY_FIELD); + assignmentMap.put(topic, Arrays.asList(partitions)); + } + return assignmentMap; + } + + public static JoinGroupResponse parse(ByteBuffer buffer) { + return new JoinGroupResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index f35bd87..2f4ad76 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -12,26 +12,33 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataRequest { - - private final List topics; +public class MetadataRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); + private static Field TOPICS_KEY_FIELD = curSchema.get("topics"); public MetadataRequest(List topics) { - this.topics = topics; + super(new Struct(curSchema)); + struct.set(TOPICS_KEY_FIELD, topics.toArray()); + } + + public MetadataRequest(Struct struct) { + super(struct); } - public Struct toStruct() { - String[] ts = new String[topics.size()]; - topics.toArray(ts); - Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id)); - body.set("topics", topics.toArray()); - return body; + public String[] topics() { + return (String[]) struct.get(TOPICS_KEY_FIELD); } + public static MetadataRequest parse(ByteBuffer buffer) { + return new MetadataRequest(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 2652c32..21feaa9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -20,28 +21,28 @@ import java.util.Map; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataResponse { +public class MetadataResponse extends GenericStruct { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); private final Cluster cluster; private final Map errors; - public MetadataResponse(Cluster cluster, Map errors) { - this.cluster = cluster; - this.errors = errors; - } - public MetadataResponse(Struct struct) { + super(struct); Map errors = new HashMap(); Map brokers = new HashMap(); Object[] brokerStructs = (Object[]) struct.get("brokers"); for (int i = 0; i < brokerStructs.length; i++) { Struct broker = (Struct) brokerStructs[i]; - int nodeId = (Integer) broker.get("node_id"); - String host = (String) broker.get("host"); - int port = (Integer) broker.get("port"); + int nodeId = broker.getInt("node_id"); + String host = broker.getString("host"); + int port = broker.getInt("port"); brokers.put(nodeId, new Node(nodeId, host, port)); } List partitions = new ArrayList(); @@ -86,4 +87,7 @@ public class MetadataResponse { return this.cluster; } + public static MetadataResponse parse(ByteBuffer buffer) { + return new MetadataResponse(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java new file mode 100644 index 0000000..c092a2f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OffsetCommitRequest extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); + private static Field GROUP_ID_KEY_FIELD = curSchema.get("group_id"); + private static Field GENERATION_ID_KEY_FIELD = curSchema.get("group_generation_id"); + private static Field CONSUMER_ID_KEY_FIELD = curSchema.get("consumer_id"); + private static Field TOPICS_KEY_FIELD = curSchema.get("topics"); + + public static Schema topicSchema = Protocol.OFFSET_COMMIT_REQUEST_TOPIC_V0; + private static Field TOPIC_KEY_FIELD = topicSchema.get("topic"); + private static Field PARTITIONS_KEY_FIELD = topicSchema.get("partitions"); + + public static Schema partitionSchema = Protocol.OFFSET_COMMIT_REQUEST_PARTITION_V0; + private static Field PARTITION_KEY_FIELD = partitionSchema.get("partition"); + private static Field COMMIT_OFFSET_KEY_FIELD = partitionSchema.get("offset"); + private static Field TIMESTAMP_KEY_FIELD = partitionSchema.get("timestamp"); + private static Field METADATA_KEY_FIELD = partitionSchema.get("metadata"); + + public static final int DEFAULT_GENERATION_ID = -1; + public static final + + public final Map> topicsData; + + public static class PartitionData { + public final int partition; + public final long offset; + public final long timestamp; + public final String metadata; + + public PartitionData(int partition, long offset, long timestamp, String metadata) { + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.metadata = metadata; + } + } + + public OffsetCommitRequest(int groupId, int generationId, String consumerId, Map> topicsData) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_FIELD, groupId); + struct.set(GENERATION_ID_KEY_FIELD, generationId); + struct.set(CONSUMER_ID_KEY_FIELD, consumerId); + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_FIELD); + topicData.set(TOPIC_KEY_FIELD, entries.getKey()); + List partitionArray = new ArrayList(); + for (PartitionData fetchPartitionData : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_FIELD); + partitionData.set(PARTITION_KEY_FIELD, fetchPartitionData.partition); + partitionData.set(COMMIT_OFFSET_KEY_FIELD, fetchPartitionData.offset); + partitionData.set(TIMESTAMP_KEY_FIELD, fetchPartitionData.timestamp); + partitionData.set(METADATA_KEY_FIELD, fetchPartitionData.metadata); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_FIELD, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_FIELD, topicArray.toArray()); + this.topicsData = topicsData; + } + + public OffsetCommitRequest(Struct struct) { + super(struct); + topicsData = new HashMap>(); + for (Struct topicResponse : (Struct[]) struct.get(TOPICS_KEY_FIELD)) { + String topic = topicResponse.getString(TOPIC_KEY_FIELD); + List partitionsPerTopic = new ArrayList(); + for (Struct partitionResponse : (Struct[]) topicResponse.get(PARTITIONS_KEY_FIELD)) { + int partition = partitionResponse.getInt(PARTITION_KEY_FIELD); + long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_FIELD); + long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_FIELD); + String metadata = partitionResponse.getString(METADATA_KEY_FIELD); + PartitionData partitionData = new PartitionData(partition, offset, timestamp, metadata); + partitionsPerTopic.add(partitionData); + } + topicsData.put(topic, partitionsPerTopic); + } + } + + public int groupId() { + return struct.getInt(GROUP_ID_KEY_FIELD); + } + + public int generationId() { + return struct.getInt(GENERATION_ID_KEY_FIELD); + } + + public long timestamp() { + return struct.getLong(TIMESTAMP_KEY_FIELD); + } + + public String metadata () { + return struct.getString(METADATA_KEY_FIELD); + } + + public Map> topicData() { + return topicsData; + } + + public static OffsetCommitRequest parse(ByteBuffer buffer) { + return new OffsetCommitRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java new file mode 100644 index 0000000..95cadb2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OffsetCommitResponse extends GenericStruct { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id); + private static Field RESPONSES_KEY_FIELD = curSchema.get("responses"); + + public static Schema topicSchema = Protocol.FETCH_RESPONSE_TOPIC_V0; + private static Field TOPIC_KEY_FIELD = topicSchema.get("topic"); + private static Field PARTITIONS_KEY_FIELD = topicSchema.get("partition_responses"); + + public static Schema partitionSchema = Protocol.FETCH_RESPONSE_PARTITION_V0; + private static Field PARTITION_KEY_FIELD = partitionSchema.get("partition"); + private static Field ERROR_CODE_KEY_FIELD = partitionSchema.get("error_code"); + + private final Map> topicsData; + + public static class PartitionData { + public final int partition; + public final int errorCode; + + public PartitionData(int partition, int errorCode) { + this.partition = partition; + this.errorCode = errorCode; + } + } + + public OffsetCommitResponse(Map> topicsData) { + super(new Struct(curSchema)); + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_FIELD); + topicData.set(TOPIC_KEY_FIELD, entries.getKey()); + List partitionArray = new ArrayList(); + for (PartitionData fetchPartitionData : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_FIELD); + partitionData.set(PARTITION_KEY_FIELD, fetchPartitionData.partition); + partitionData.set(ERROR_CODE_KEY_FIELD, fetchPartitionData.errorCode); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_FIELD, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_FIELD, topicArray.toArray()); + this.topicsData = topicsData; + } + + public OffsetCommitResponse(Struct struct) { + super(struct); + topicsData = new HashMap>(); + for (Struct topicResponse : (Struct[]) struct.get(RESPONSES_KEY_FIELD)) { + String topic = topicResponse.getString(TOPIC_KEY_FIELD); + List partitionsPerTopic = new ArrayList(); + for (Struct partitionResponse : (Struct[]) topicResponse.get(PARTITIONS_KEY_FIELD)) { + int partition = partitionResponse.getInt(PARTITION_KEY_FIELD); + int errorCode = partitionResponse.getInt(ERROR_CODE_KEY_FIELD); + PartitionData partitionData = new PartitionData(partition, errorCode); + partitionsPerTopic.add(partitionData); + } + topicsData.put(topic, partitionsPerTopic); + } + } + + public Map> topicData() { + return topicsData; + } + + public static OffsetCommitResponse parse(ByteBuffer buffer) { + return new OffsetCommitResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6fa4a58..c69ed0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -13,12 +13,18 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -public class ProduceResponse { +public class ProduceResponse extends GenericStruct { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id); + public class PartitionResponse { public int partitionId; public short errorCode; @@ -29,31 +35,22 @@ public class ProduceResponse { this.errorCode = errorCode; this.baseOffset = baseOffset; } - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append('{'); - b.append("pid: " + partitionId); - b.append(",error: " + errorCode); - b.append(",offset: " + baseOffset); - b.append('}'); - return b.toString(); - } } private final Map> responses; public ProduceResponse(Struct struct) { + super(struct); responses = new HashMap>(); - for (Object topicResponse : (Object[]) struct.get("responses")) { + for (Object topicResponse : struct.getArray("responses")) { Struct topicRespStruct = (Struct) topicResponse; - String topic = (String) topicRespStruct.get("topic"); + String topic = topicRespStruct.getString("topic"); Map topicResponses = new HashMap(); - for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { + for (Object partResponse : topicRespStruct.getArray("partition_responses")) { Struct partRespStruct = (Struct) partResponse; - int partition = (Integer) partRespStruct.get("partition"); - short errorCode = (Short) partRespStruct.get("error_code"); - long offset = (Long) partRespStruct.get("base_offset"); + int partition = partRespStruct.getInt("partition"); + short errorCode = partRespStruct.getShort("error_code"); + long offset = partRespStruct.getLong("base_offset"); TopicPartition tp = new TopicPartition(topic, partition); topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset)); } @@ -65,21 +62,8 @@ public class ProduceResponse { return this.responses; } - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append('{'); - boolean isFirst = true; - for (Map response : responses.values()) { - for (Map.Entry entry : response.entrySet()) { - if (isFirst) - isFirst = false; - else - b.append(','); - b.append(entry.getKey() + " : " + entry.getValue()); - } - } - b.append('}'); - return b.toString(); + public static ProduceResponse parse(ByteBuffer buffer) { + return new ProduceResponse(((Struct) curSchema.read(buffer))); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 66cc2fe..f5f44e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -24,18 +24,15 @@ import org.apache.kafka.common.protocol.types.Struct; /** * The header for a request in the Kafka protocol */ -public class RequestHeader { +public class RequestHeader extends GenericStruct { private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key"); private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version"); private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id"); private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id"); - private final Struct header; - public RequestHeader(Struct header) { - super(); - this.header = header; + super(header); } public RequestHeader(short apiKey, String client, int correlation) { @@ -44,42 +41,29 @@ public class RequestHeader { public RequestHeader(short apiKey, short version, String client, int correlation) { this(new Struct(Protocol.REQUEST_HEADER)); - this.header.set(API_KEY_FIELD, apiKey); - this.header.set(API_VERSION_FIELD, version); - this.header.set(CLIENT_ID_FIELD, client); - this.header.set(CORRELATION_ID_FIELD, correlation); + struct.set(API_KEY_FIELD, apiKey); + struct.set(API_VERSION_FIELD, version); + struct.set(CLIENT_ID_FIELD, client); + struct.set(CORRELATION_ID_FIELD, correlation); } public short apiKey() { - return (Short) this.header.get(API_KEY_FIELD); + return struct.getShort(API_KEY_FIELD); } public short apiVersion() { - return (Short) this.header.get(API_VERSION_FIELD); + return struct.getShort(API_VERSION_FIELD); } public String clientId() { - return (String) this.header.get(CLIENT_ID_FIELD); + return struct.getString(CLIENT_ID_FIELD); } public int correlationId() { - return (Integer) this.header.get(CORRELATION_ID_FIELD); + return struct.getInt(CORRELATION_ID_FIELD); } public static RequestHeader parse(ByteBuffer buffer) { return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer)); } - - public void writeTo(ByteBuffer buffer) { - header.writeTo(buffer); - } - - public int sizeOf() { - return header.sizeOf(); - } - - @Override - public String toString() { - return header.toString(); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java index 257b828..7de5ada 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java @@ -28,31 +28,21 @@ import org.apache.kafka.common.protocol.types.Struct; /** * A response header in the kafka protocol. */ -public class ResponseHeader { +public class ResponseHeader extends GenericStruct { private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id"); - private final Struct header; - public ResponseHeader(Struct header) { - this.header = header; + super(header); } public ResponseHeader(int correlationId) { this(new Struct(Protocol.RESPONSE_HEADER)); - this.header.set(CORRELATION_KEY_FIELD, correlationId); + struct.set(CORRELATION_KEY_FIELD, correlationId); } public int correlationId() { - return (Integer) header.get(CORRELATION_KEY_FIELD); - } - - public void writeTo(ByteBuffer buffer) { - header.writeTo(buffer); - } - - public int sizeOf() { - return header.sizeOf(); + return struct.getInt(CORRELATION_KEY_FIELD); } public static ResponseHeader parse(ByteBuffer buffer) { diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala index dfad6e6..6d00ed0 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala @@ -41,9 +41,9 @@ object ConsumerMetadataRequest { case class ConsumerMetadataRequest(group: String, versionId: Short = ConsumerMetadataRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = ConsumerMetadataRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) { def sizeInBytes = 2 + /* versionId */ diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala index f8cf6c3..b9a3ed0 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala @@ -39,8 +39,8 @@ object ConsumerMetadataResponse { } -case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0) - extends RequestOrResponse(correlationId = correlationId) { +case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, val correlationId: Int = 0) + extends RequestOrResponse() { def sizeInBytes = 4 + /* correlationId */ diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 7dacb20..5be393a 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -38,9 +38,9 @@ object ControlledShutdownRequest extends Logging { } case class ControlledShutdownRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val brokerId: Int) - extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){ + extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){ def this(correlationId: Int, brokerId: Int) = this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId) diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index 46ec3db..5e0a1cf 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -39,10 +39,10 @@ object ControlledShutdownResponse { } -case class ControlledShutdownResponse(override val correlationId: Int, +case class ControlledShutdownResponse(val correlationId: Int, val errorCode: Short = ErrorMapping.NoError, val partitionsRemaining: Set[TopicAndPartition]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a8b73ac..55a5982 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -60,13 +60,13 @@ object FetchRequest { } case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, - override val correlationId: Int = FetchRequest.DefaultCorrelationId, + correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = ConsumerConfig.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) - extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.FetchKey)) { /** * Partitions the request info into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala new file mode 100644 index 0000000..3026b7d --- /dev/null +++ b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import org.apache.kafka.common.requests.GenericStruct + +private[kafka] abstract class GenericRequestOrResponseAndHeader(private val header: GenericStruct, + private val body: GenericStruct, + private val name: String, + private val requestId: Option[Short] = None) + extends RequestOrResponse(requestId) { + + def writeTo(buffer: ByteBuffer) { + ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); + header.writeTo(buffer); + body.writeTo(buffer); + buffer.rewind(); +s } + + def sizeInBytes(): Int = { + header.sizeOf() + body.sizeOf(); + } + + override def toString(): String = { + describe(true) + } + + override def describe(details: Boolean): String = { + val strBuffer = new StringBuilder + strBuffer.append("Name: " + name + ";") + strBuffer.append(header.toString) + strBuffer.append(body.toString) + strBuffer.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/HeartBeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartBeatRequestAndHeader.scala new file mode 100644 index 0000000..8c9f906 --- /dev/null +++ b/core/src/main/scala/kafka/api/HeartBeatRequestAndHeader.scala @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import kafka.network.RequestChannel.Response +import kafka.utils.Logging +import org.apache.kafka.common.requests.{HeartBeatResponse, ResponseHeader, HeartBeatRequest, RequestHeader} + +object HeartBeatRequestAndHeader extends Logging { + def readFrom(buffer: ByteBuffer): HeartBeatRequestAndHeader = { + new HeartBeatRequestAndHeader(buffer) + } +} + +case class HeartBeatRequestAndHeader(header: RequestHeader, body: HeartBeatRequest) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartBeatKey), Some(RequestKeys.HeartBeatKey)) { + + def this(buffer: ByteBuffer) { + this(RequestHeader.parse(buffer), + HeartBeatRequest.parse(buffer)) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponseHeader = new ResponseHeader(header.correlationId) + val errorResponseBody = new HeartBeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val errorHeartBeatResponseAndHeader = new HeartBeatResponseAndHeader(errorResponseHeader, errorResponseBody) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) + } +} diff --git a/core/src/main/scala/kafka/api/HeartBeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartBeatResponseAndHeader.scala new file mode 100644 index 0000000..2085763 --- /dev/null +++ b/core/src/main/scala/kafka/api/HeartBeatResponseAndHeader.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.requests.{ResponseHeader, HeartBeatResponse} + +case class HeartBeatResponseAndHeader(header: ResponseHeader, body: HeartBeatResponse) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartBeatKey), None) { + +} diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala new file mode 100644 index 0000000..74912ea --- /dev/null +++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import kafka.utils.Logging +import org.apache.kafka.common.requests._ +import kafka.network.RequestChannel.Response +import scala.Some + +object JoinGroupRequestAndHeader extends Logging { + def readFrom(buffer: ByteBuffer): HeartBeatRequestAndHeader = { + new JoinGroupRequestAndHeader(buffer) + } +} + +case class JoinGroupRequestAndHeader(header: RequestHeader, body: JoinGroupRequest) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { + + def this(buffer: ByteBuffer) { + this(RequestHeader.parse(buffer), + HeartBeatRequest.parse(buffer)) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponseHeader = new ResponseHeader(header.correlationId) + val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) + } +} diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala new file mode 100644 index 0000000..ee3c28e --- /dev/null +++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader} + +case class JoinGroupResponseAndHeader(header: ResponseHeader, body: JoinGroupResponse) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { + +} diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3e40817..4ff7e8f 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -129,13 +129,13 @@ object LeaderAndIsrRequest { } case class LeaderAndIsrRequest (versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker]) - extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index f636444..22ce48a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -41,10 +41,10 @@ object LeaderAndIsrResponse { } -case class LeaderAndIsrResponse(override val correlationId: Int, +case class LeaderAndIsrResponse(correlationId: Int, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 630768a..d73beb9 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -61,9 +61,9 @@ object OffsetCommitRequest extends Logging { case class OffsetCommitRequest(groupId: String, requestInfo: immutable.Map[TopicAndPartition, OffsetAndMetadata], versionId: Short = OffsetCommitRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 4946e97..624a1c1 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -42,8 +42,8 @@ object OffsetCommitResponse extends Logging { } case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], - override val correlationId: Int = 0) - extends RequestOrResponse(correlationId=correlationId) { + correlationId: Int = 0) + extends RequestOrResponse() { lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index a32f858..c7604b9 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -52,9 +52,9 @@ object OffsetFetchRequest extends Logging { case class OffsetFetchRequest(groupId: String, requestInfo: Seq[TopicAndPartition], versionId: Short = OffsetFetchRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = OffsetFetchRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index c1222f4..e3523f8 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -45,8 +45,8 @@ object OffsetFetchResponse extends Logging { } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - override val correlationId: Int = 0) - extends RequestOrResponse(correlationId = correlationId) { + correlationId: Int = 0) + extends RequestOrResponse() { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 7cbc26c..3d483bc 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], versionId: Short = OffsetRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = OffsetRequest.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId) - extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 0e1d6e3..63c0899 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -51,9 +51,9 @@ case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) { } -case class OffsetResponse(override val correlationId: Int, +case class OffsetResponse(correlationId: Int, partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 0c295a2..b2366e7 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -53,12 +53,12 @@ object ProducerRequest { } case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, - override val correlationId: Int, + correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) - extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { /** * Partitions the data into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5a1d801..a286272 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,9 +43,9 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(override val correlationId: Int, +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { /** * Partitions the status map into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index fbfc9d3..a28572e 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -32,6 +32,8 @@ object RequestKeys { val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 val ConsumerMetadataKey: Short = 10 + val JoinGroupKey: Short = 11 + val HeartBeatKey: Short = 12 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 57f87a4..0334343 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -30,7 +30,7 @@ object Request { } -private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging { +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging { def sizeInBytes: Int diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 68fc138..5e14987 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -54,13 +54,13 @@ object StopReplicaRequest extends Logging { } case class StopReplicaRequest(versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, deletePartitions: Boolean, partitions: Set[TopicAndPartition]) - extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = { this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index c90ddee..3431f3f 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -42,10 +42,10 @@ object StopReplicaResponse { } -case class StopReplicaResponse(override val correlationId: Int, +case class StopReplicaResponse(val correlationId: Int, val responseMap: Map[TopicAndPartition, Short], val errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index a319f2f..dcd0c68 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging { } case class TopicMetadataRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val clientId: String, val topics: Seq[String]) - extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){ + extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String], correlationId: Int) = this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index f6b7429..f65b57b 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -34,8 +34,8 @@ object TopicMetadataResponse { } case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], - override val correlationId: Int) - extends RequestOrResponse(correlationId = correlationId) { + correlationId: Int) + extends RequestOrResponse() { val sizeInBytes: Int = { val brokers = extractBrokers(topicsMetadata).values 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 543e262..530982e 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -55,13 +55,13 @@ object UpdateMetadataRequest { } case class UpdateMetadataRequest (versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) - extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey)) { def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String, partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = { diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala index c583c1f..53f6067 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala @@ -32,9 +32,9 @@ object UpdateMetadataResponse { } } -case class UpdateMetadataResponse(override val correlationId: Int, +case class UpdateMetadataResponse(correlationId: Int, errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */ def writeTo(buffer: ByteBuffer) { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8763968..624872c 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -133,9 +133,9 @@ class RequestSendThread(val controllerId: Int, isSendSuccessful = true } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message - error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " + + error(("Controller %d epoch %d failed to send request %s with correlation id %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, - RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e) + request, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false @@ -153,8 +153,8 @@ class RequestSendThread(val controllerId: Int, case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) } - stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString())) + stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" + .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) if(callback != null) { callback(response) diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 7e6da16..b0b7be1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -24,10 +24,10 @@ import kafka.common.ErrorMapping import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val clientId: String, val topics: java.util.List[String]) - extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { + extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = { import scala.collection.JavaConversions._