diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index a016269..8ebe7ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -276,13 +277,16 @@ public class Sender implements Runnable { * Create a produce request from the given record batches */ private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List batches) { - ProduceRequest request = new ProduceRequest(acks, timeout); + Map produceRecordsByPartition = new HashMap(batches.size()); Map recordsByPartition = new HashMap(batches.size()); for (RecordBatch batch : batches) { - batch.records.buffer().flip(); - request.add(batch.topicPartition, batch.records); - recordsByPartition.put(batch.topicPartition, batch); + TopicPartition tp = batch.topicPartition; + ByteBuffer recordsBuffer = batch.records.buffer(); + recordsBuffer.flip(); + produceRecordsByPartition.put(tp, recordsBuffer); + recordsByPartition.put(tp, batch); } + ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); return new ClientRequest(now, acks != 0, send, recordsByPartition); } diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index c62707a..d3299b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -15,12 +15,7 @@ package org.apache.kafka.common; import org.apache.kafka.common.utils.Utils; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster. @@ -143,6 +138,14 @@ public final class Cluster { return this.partitionsByNode.get(nodeId); } + /** + * Get all topics. + * @return a set of all topics + */ + public Set topics() { + return this.partitionsByTopic.keySet(); + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..109fc96 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,8 +30,11 @@ public enum ApiKeys { METADATA(3, "metadata"), LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), - OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + OFFSET_COMMIT(8, "offset_commit"), + OFFSET_FETCH(9, "offset_fetch"), + CONSUMER_METADATA(10, "consumer_metadata"), + JOIN_GROUP(11, "join_group"), + HEARTBEAT(12, "heartbeat"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..7517b87 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -104,6 +104,264 @@ public class Protocol { public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; + /* Offset commit api */ + public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), + new Field("timestamp", + INT64, + "Timestamp of the commit"), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), + "Partitions to commit offsets.")); + + public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + "Topics to commit offsets.")); + + public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + "Topics to commit offsets.")); + + public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", + INT16)); + + public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0))); + + public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); + + public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 }; + /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */ + public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; + + /* Offset fetch api */ + public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id.")); + + public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to fetch offset."), + new Field("partitions", + new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0), + "Partitions to fetch offsets.")); + + public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("topics", + new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), + "Topics to fetch offsets.")); + + public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Last committed message offset."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep."), + new Field("error_code", + INT16)); + + public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0))); + + public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); + + public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 }; + public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 }; + + /* List offset api */ + public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("timestamp", + INT64, + "Timestamp."), + new Field("max_num_offsets", + INT32, + "Maximum offsets to return.")); + + public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to list offset."), + new Field("partitions", + new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0), + "Partitions to list offset.")); + + public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id", + INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("topics", + new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0), + "Topics to list offsets.")); + + public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", + INT16), + new Field("offsets", + new ArrayOf(INT64), + "A list of offsets.")); + + public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0))); + + public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0))); + + public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 }; + public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 }; + + /* Fetch api */ + public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("fetch_offset", + INT64, + "Message offset."), + new Field("max_bytes", + INT32, + "Maximum bytes to fetch.")); + + public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", + STRING, + "Topic to fetch."), + new Field("partitions", + new ArrayOf(FETCH_REQUEST_PARTITION_V0), + "Partitions to fetch.")); + + public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id", + INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("max_wait_time", + INT32, + "Maximum time in ms to wait for the response."), + new Field("min_bytes", + INT32, + "Minimum bytes to accumulate in the response."), + new Field("topics", + new ArrayOf(FETCH_REQUEST_TOPIC_V0), + "Topics to fetch.")); + + public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", + INT16), + new Field("high_watermark", + INT64, + "Last committed offset."), + new Field("record_set", BYTES)); + + public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(FETCH_RESPONSE_PARTITION_V0))); + + public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", + new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + + public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 }; + public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 }; + + /* Consumer metadata api */ + public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id.")); + + public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", + INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); + + public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 }; + public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 }; + + /* Join group api */ + public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("session_timeout", + INT32, + "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."), + new Field("topics", + new ArrayOf(STRING), + "An array of topics to subscribe to."), + new Field("consumer_id", + STRING, + "The assigned consumer id or an empty string for a new consumer."), + new Field("partition_assignment_strategy", + STRING, + "The strategy for the coordinator to assign partitions.")); + + public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), + new Field("partitions", new ArrayOf(INT32))); + public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", + INT16), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("assigned_partitions", + new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0))); + + public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 }; + public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 }; + + /* Heartbeat api */ + public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator.")); + + public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", + INT16)); + + public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0}; + public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0}; + /* an array of all requests and responses with all schema versions */ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -113,22 +371,28 @@ public class Protocol { static { REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; - REQUESTS[ApiKeys.FETCH.id] = new Schema[] {}; - REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; + REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; + REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; - REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; - REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; + REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; + REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; + REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; + REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; - RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; - RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; + RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; + RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE; RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; - RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; - RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; + RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; + RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; + RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; + RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 8cecba5..444e69e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -83,6 +83,15 @@ public class Struct { return getFieldOrDefault(field); } + /** + * Check if the struct contains a field. + * @param name + * @return + */ + public boolean hasField(String name) { + return schema.get(name) != null; + } + public Struct getStruct(Field field) { return (Struct) get(field); } @@ -107,6 +116,22 @@ public class Struct { return (Integer) get(name); } + public Long getLong(Field field) { + return (Long) get(field); + } + + public Long getLong(String name) { + return (Long) get(name); + } + + public ByteBuffer getBytes(Field field) { + return (ByteBuffer) get(field); + } + + public ByteBuffer getBytes(String name) { + return (ByteBuffer) get(name); + } + public Object[] getArray(Field field) { return (Object[]) get(field); } @@ -253,4 +278,46 @@ public class Struct { return b.toString(); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (int i = 0; i < this.values.length; i++) { + Field f = this.schema.get(i); + if (f.type() instanceof ArrayOf) { + Object[] arrayObject = (Object []) this.get(f); + for (Object arrayItem: arrayObject) + result = prime * result + arrayItem.hashCode(); + } else { + result = prime * result + this.get(f).hashCode(); + } + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Struct other = (Struct) obj; + if (schema != other.schema) + return false; + for (int i = 0; i < this.values.length; i++) { + Field f = this.schema.get(i); + Boolean result; + if (f.type() instanceof ArrayOf) { + result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f)); + } else { + result = this.get(f).equals(other.get(f)); + } + if (!result) + return false; + } + return true; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java new file mode 100644 index 0000000..37aff6c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public abstract class AbstractRequestResponse { + protected final Struct struct; + + + public AbstractRequestResponse(Struct struct) { + this.struct = struct; + } + + public Struct toStruct() { + return struct; + } + + /** + * Get the serialized size of this object + */ + public int sizeOf() { + return struct.sizeOf(); + } + + /** + * Write this object to a buffer + */ + public void writeTo(ByteBuffer buffer) { + struct.writeTo(buffer); + } + + @Override + public String toString() { + return struct.toString(); + } + + @Override + public int hashCode() { + return struct.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + AbstractRequestResponse other = (AbstractRequestResponse) obj; + return struct.equals(other.struct); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java new file mode 100644 index 0000000..99b52c2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ConsumerMetadataRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id); + private static String GROUP_ID_KEY_NAME = "group_id"; + + private final String groupId; + + public ConsumerMetadataRequest(String groupId) { + super(new Struct(curSchema)); + + struct.set(GROUP_ID_KEY_NAME, groupId); + this.groupId = groupId; + } + + public ConsumerMetadataRequest(Struct struct) { + super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); + } + + public String groupId() { + return groupId; + } + + public static ConsumerMetadataRequest parse(ByteBuffer buffer) { + return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java new file mode 100644 index 0000000..8b8f591 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ConsumerMetadataResponse extends AbstractRequestResponse { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id); + private static String ERROR_CODE_KEY_NAME = "error_code"; + private static String COORDINATOR_KEY_NAME = "coordinator"; + + // coordinator level field names + private static String NODE_ID_KEY_NAME = "node_id"; + private static String HOST_KEY_NAME = "host"; + private static String PORT_KEY_NAME = "port"; + + private final short errorCode; + private final Node node; + + public ConsumerMetadataResponse(short errorCode, Node node) { + super(new Struct(curSchema)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); + coordinator.set(NODE_ID_KEY_NAME, node.id()); + coordinator.set(HOST_KEY_NAME, node.host()); + coordinator.set(PORT_KEY_NAME, node.port()); + struct.set(COORDINATOR_KEY_NAME, coordinator); + this.errorCode = errorCode; + this.node = node; + } + + public ConsumerMetadataResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); + int nodeId = broker.getInt(NODE_ID_KEY_NAME); + String host = broker.getString(HOST_KEY_NAME); + int port = broker.getInt(PORT_KEY_NAME); + node = new Node(nodeId, host, port); + } + + public short errorCode() { + return errorCode; + } + + public Node node() { + return node; + } + + public static ConsumerMetadataResponse parse(ByteBuffer buffer) { + return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java new file mode 100644 index 0000000..2fc471f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id); + private static String REPLICA_ID_KEY_NAME = "replica_id"; + private static String MAX_WAIT_KEY_NAME = "max_wait_time"; + private static String MIN_BYTES_KEY_NAME = "min_bytes"; + private static String TOPICS_KEY_NAME = "topics"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partitions"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String FETCH_OFFSET_KEY_NAME = "fetch_offset"; + private static String MAX_BYTES_KEY_NAME = "max_bytes"; + + private final int replicaId; + private final int maxWait; + private final int minBytes; + private final Map fetchData; + + public static final class PartitionData { + public final long offset; + public final int maxBytes; + + public PartitionData(long offset, int maxBytes) { + this.offset = offset; + this.maxBytes = maxBytes; + } + } + + public FetchRequest(int replicaId, int maxWait, int minBytes, Map fetchData) { + super(new Struct(curSchema)); + Map> topicsData = CollectionUtils.groupDataByTopic(fetchData); + + struct.set(REPLICA_ID_KEY_NAME, replicaId); + struct.set(MAX_WAIT_KEY_NAME, maxWait); + struct.set(MIN_BYTES_KEY_NAME, minBytes); + List topicArray = new ArrayList(); + for (Map.Entry> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset); + partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + this.replicaId = replicaId; + this.maxWait = maxWait; + this.minBytes = minBytes; + this.fetchData = fetchData; + } + + public FetchRequest(Struct struct) { + super(struct); + replicaId = struct.getInt(REPLICA_ID_KEY_NAME); + maxWait = struct.getInt(MAX_WAIT_KEY_NAME); + minBytes = struct.getInt(MIN_BYTES_KEY_NAME); + fetchData = new HashMap(); + for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME); + int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME); + PartitionData partitionData = new PartitionData(offset, maxBytes); + fetchData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + public int replicaId() { + return replicaId; + } + + public int maxWait() { + return maxWait; + } + + public int minBytes() { + return minBytes; + } + + public Map fetchData() { + return fetchData; + } + + public static FetchRequest parse(ByteBuffer buffer) { + return new FetchRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java new file mode 100644 index 0000000..f719010 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchResponse extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id); + private static String RESPONSES_KEY_NAME = "responses"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partition_responses"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String ERROR_CODE_KEY_NAME = "error_code"; + private static String HIGH_WATERMARK_KEY_NAME = "high_watermark"; + private static String RECORD_SET_KEY_NAME = "record_set"; + + private final Map responseData; + + public static final class PartitionData { + public final short errorCode; + public final long highWatermark; + public final ByteBuffer recordSet; + + public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) { + this.errorCode = errorCode; + this.highWatermark = highWatermark; + this.recordSet = recordSet; + } + } + + public FetchResponse(Map responseData) { + super(new Struct(curSchema)); + Map> topicsData = CollectionUtils.groupDataByTopic(responseData); + + List topicArray = new ArrayList(); + for (Map.Entry> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode); + partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark); + partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + this.responseData = responseData; + } + + public FetchResponse(Struct struct) { + super(struct); + responseData = new HashMap(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME); + ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME); + PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet); + responseData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + public Map responseData() { + return responseData; + } + + public static FetchResponse parse(ByteBuffer buffer) { + return new FetchResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java new file mode 100644 index 0000000..9512db2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class HeartbeatRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id); + private static String GROUP_ID_KEY_NAME = "group_id"; + private static String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id"; + private static String CONSUMER_ID_KEY_NAME = "consumer_id"; + + private final String groupId; + private final int groupGenerationId; + private final String consumerId; + + public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + this.groupId = groupId; + this.groupGenerationId = groupGenerationId; + this.consumerId = consumerId; + } + + public HeartbeatRequest(Struct struct) { + super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); + groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME); + consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + } + + public String groupId() { + return groupId; + } + + public int groupGenerationId() { + return groupGenerationId; + } + + public String consumerId() { + return consumerId; + } + + public static HeartbeatRequest parse(ByteBuffer buffer) { + return new HeartbeatRequest(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java new file mode 100644 index 0000000..8997ffc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class HeartbeatResponse extends AbstractRequestResponse { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id); + private static String ERROR_CODE_KEY_NAME = "error_code"; + + private final short errorCode; + public HeartbeatResponse(short errorCode) { + super(new Struct(curSchema)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + this.errorCode = errorCode; + } + + public HeartbeatResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + } + + public short errorCode() { + return errorCode; + } + + public static HeartbeatResponse parse(ByteBuffer buffer) { + return new HeartbeatResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java new file mode 100644 index 0000000..d6e91f3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class JoinGroupRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); + private static String GROUP_ID_KEY_NAME = "group_id"; + private static String SESSION_TIMEOUT_KEY_NAME = "session_timeout"; + private static String TOPICS_KEY_NAME = "topics"; + private static String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static String STRATEGY_KEY_NAME = "partition_assignment_strategy"; + + private final String groupId; + private final int sessionTimeout; + private final List topics; + private final String consumerId; + private final String strategy; + + public JoinGroupRequest(String groupId, int sessionTimeout, List topics, String consumerId, String strategy) { + super(new Struct(curSchema)); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); + struct.set(TOPICS_KEY_NAME, topics.toArray()); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(STRATEGY_KEY_NAME, strategy); + this.groupId = groupId; + this.sessionTimeout = sessionTimeout; + this.topics = topics; + this.consumerId = consumerId; + this.strategy = strategy; + } + + public JoinGroupRequest(Struct struct) { + super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); + sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); + Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME); + topics = new ArrayList(); + for (Object topic: topicsArray) + topics.add((String) topic); + consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + strategy = struct.getString(STRATEGY_KEY_NAME); + } + + public String groupId() { + return groupId; + } + + public int sessionTimeout() { + return sessionTimeout; + } + + public List topics() { + return topics; + } + + public String consumerId() { + return consumerId; + } + + public String strategy() { + return strategy; + } + + public static JoinGroupRequest parse(ByteBuffer buffer) { + return new JoinGroupRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java new file mode 100644 index 0000000..efe8979 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.*; + +public class JoinGroupResponse extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id); + private static String ERROR_CODE_KEY_NAME = "error_code"; + private static String GENERATION_ID_KEY_NAME = "group_generation_id"; + private static String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions"; + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partitions"; + + public static int UNKNOWN_GENERATION_ID = -1; + public static String UNKNOWN_CONSUMER_ID = ""; + + private final short errorCode; + private final int generationId; + private final String consumerId; + private final List assignedPartitions; + + public JoinGroupResponse(short errorCode, int generationId, String consumerId, List assignedPartitions) { + super(new Struct(curSchema)); + + Map> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions); + + struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + List topicArray = new ArrayList(); + for (Map.Entry> entries: partitionsByTopic.entrySet()) { + Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray()); + topicArray.add(topicData); + } + struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray()); + + this.errorCode = errorCode; + this.generationId = generationId; + this.consumerId = consumerId; + this.assignedPartitions = assignedPartitions; + } + + public JoinGroupResponse(short errorCode) { + this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.emptyList()); + } + + public JoinGroupResponse(Struct struct) { + super(struct); + assignedPartitions = new ArrayList(); + for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) { + Struct topicData = (Struct) topicDataObj; + String topic = topicData.getString(TOPIC_KEY_NAME); + for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME)) + assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj)); + } + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + generationId = struct.getInt(GENERATION_ID_KEY_NAME); + consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + } + + public short errorCode() { + return errorCode; + } + + public int generationId() { + return generationId; + } + + public String consumerId() { + return consumerId; + } + + public List assignedPartitions() { + return assignedPartitions; + } + + public static JoinGroupResponse parse(ByteBuffer buffer) { + return new JoinGroupResponse(((Struct) curSchema.read(buffer))); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java new file mode 100644 index 0000000..99364c1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ListOffsetRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id); + private static String REPLICA_ID_KEY_NAME = "replica_id"; + private static String TOPICS_KEY_NAME = "topics"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partitions"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String TIMESTAMP_KEY_NAME = "timestamp"; + private static String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets"; + + private final int replicaId; + private final Map offsetData; + + public static final class PartitionData { + public final long timestamp; + public final int maxNumOffsets; + + public PartitionData(long timestamp, int maxNumOffsets) { + this.timestamp = timestamp; + this.maxNumOffsets = maxNumOffsets; + } + } + + public ListOffsetRequest(int replicaId, Map offsetData) { + super(new Struct(curSchema)); + Map> topicsData = CollectionUtils.groupDataByTopic(offsetData); + + struct.set(REPLICA_ID_KEY_NAME, replicaId); + List topicArray = new ArrayList(); + for (Map.Entry> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData offsetPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); + partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + this.replicaId = replicaId; + this.offsetData = offsetData; + } + + public ListOffsetRequest(Struct struct) { + super(struct); + replicaId = struct.getInt(REPLICA_ID_KEY_NAME); + offsetData = new HashMap(); + for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME); + PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets); + offsetData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + public int replicaId() { + return replicaId; + } + + public Map offsetData() { + return offsetData; + } + + public static ListOffsetRequest parse(ByteBuffer buffer) { + return new ListOffsetRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java new file mode 100644 index 0000000..ac23971 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ListOffsetResponse extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id); + private static String RESPONSES_KEY_NAME = "responses"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partition_responses"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String ERROR_CODE_KEY_NAME = "error_code"; + private static String OFFSETS_KEY_NAME = "offsets"; + + private final Map responseData; + + public static final class PartitionData { + public final short errorCode; + public final List offsets; + + public PartitionData(short errorCode, List offsets) { + this.errorCode = errorCode; + this.offsets = offsets; + } + } + + public ListOffsetResponse(Map responseData) { + super(new Struct(curSchema)); + Map> topicsData = CollectionUtils.groupDataByTopic(responseData); + + List topicArray = new ArrayList(); + for (Map.Entry> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData offsetPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode); + partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + this.responseData = responseData; + } + + public ListOffsetResponse(Struct struct) { + super(struct); + responseData = new HashMap(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME); + List offsetsList = new ArrayList(); + for (Object offset: offsets) + offsetsList.add((Long) offset); + PartitionData partitionData = new PartitionData(errorCode, offsetsList); + responseData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + public Map responseData() { + return responseData; + } + + public static ListOffsetResponse parse(ByteBuffer buffer) { + return new ListOffsetResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index f35bd87..b22ca1d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -12,26 +12,41 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataRequest { +public class MetadataRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); + private static String TOPICS_KEY_NAME = "topics"; private final List topics; public MetadataRequest(List topics) { + super(new Struct(curSchema)); + struct.set(TOPICS_KEY_NAME, topics.toArray()); this.topics = topics; } - public Struct toStruct() { - String[] ts = new String[topics.size()]; - topics.toArray(ts); - Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id)); - body.set("topics", topics.toArray()); - return body; + public MetadataRequest(Struct struct) { + super(struct); + Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); + topics = new ArrayList(); + for (Object topicObj: topicArray) { + topics.add((String) topicObj); + } } + public List topics() { + return topics; + } + + public static MetadataRequest parse(ByteBuffer buffer) { + return new MetadataRequest(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 2652c32..7d90fce 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -20,50 +21,112 @@ import java.util.Map; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataResponse { +public class MetadataResponse extends AbstractRequestResponse { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); + private static String BROKERS_KEY_NAME = "brokers"; + private static String TOPIC_METATDATA_KEY_NAME = "topic_metadata"; + + // broker level field names + private static String NODE_ID_KEY_NAME = "node_id"; + private static String HOST_KEY_NAME = "host"; + private static String PORT_KEY_NAME = "port"; + + // topic level field names + private static String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code"; + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITION_METADATA_KEY_NAME = "partition_metadata"; + + // partition level field names + private static String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code"; + private static String PARTITION_KEY_NAME = "partition_id"; + private static String LEADER_KEY_NAME = "leader"; + private static String REPLICAS_KEY_NAME = "replicas"; + private static String ISR_KEY_NAME = "isr"; private final Cluster cluster; private final Map errors; - public MetadataResponse(Cluster cluster, Map errors) { + public MetadataResponse(Cluster cluster) { + super(new Struct(curSchema)); + + List brokerArray = new ArrayList(); + for (Node node: cluster.nodes()) { + Struct broker = struct.instance(BROKERS_KEY_NAME); + broker.set(NODE_ID_KEY_NAME, node.id()); + broker.set(HOST_KEY_NAME, node.host()); + broker.set(PORT_KEY_NAME, node.port()); + brokerArray.add(broker); + } + struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); + + List topicArray = new ArrayList(); + for (String topic: cluster.topics()) { + Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0); // no error + topicData.set(TOPIC_KEY_NAME, topic); + List partitionArray = new ArrayList(); + for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) { + Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); + partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0); // no error + partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition()); + partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id()); + ArrayList replicas = new ArrayList(); + for (Node node: fetchPartitionData.replicas()) + replicas.add(node.id()); + partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); + ArrayList isr = new ArrayList(); + for (Node node: fetchPartitionData.inSyncReplicas()) + isr.add(node.id()); + partitionData.set(ISR_KEY_NAME, isr.toArray()); + partitionArray.add(partitionData); + } + topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray()); + this.cluster = cluster; - this.errors = errors; + this.errors = new HashMap(); } public MetadataResponse(Struct struct) { + super(struct); Map errors = new HashMap(); Map brokers = new HashMap(); - Object[] brokerStructs = (Object[]) struct.get("brokers"); + Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME); for (int i = 0; i < brokerStructs.length; i++) { Struct broker = (Struct) brokerStructs[i]; - int nodeId = (Integer) broker.get("node_id"); - String host = (String) broker.get("host"); - int port = (Integer) broker.get("port"); + int nodeId = broker.getInt(NODE_ID_KEY_NAME); + String host = broker.getString(HOST_KEY_NAME); + int port = broker.getInt(PORT_KEY_NAME); brokers.put(nodeId, new Node(nodeId, host, port)); } List partitions = new ArrayList(); - Object[] topicInfos = (Object[]) struct.get("topic_metadata"); + Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME); for (int i = 0; i < topicInfos.length; i++) { Struct topicInfo = (Struct) topicInfos[i]; - short topicError = topicInfo.getShort("topic_error_code"); - String topic = topicInfo.getString("topic"); + short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME); + String topic = topicInfo.getString(TOPIC_KEY_NAME); if (topicError == Errors.NONE.code()) { - Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata"); + Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); for (int j = 0; j < partitionInfos.length; j++) { Struct partitionInfo = (Struct) partitionInfos[j]; - short partError = partitionInfo.getShort("partition_error_code"); + short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME); if (partError == Errors.NONE.code()) { - int partition = partitionInfo.getInt("partition_id"); - int leader = partitionInfo.getInt("leader"); + int partition = partitionInfo.getInt(PARTITION_KEY_NAME); + int leader = partitionInfo.getInt(LEADER_KEY_NAME); Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get("replicas"); + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); Node[] replicaNodes = new Node[replicas.length]; for (int k = 0; k < replicas.length; k++) replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get("isr"); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); Node[] isrNodes = new Node[isr.length]; for (int k = 0; k < isr.length; k++) isrNodes[k] = brokers.get(isr[k]); @@ -86,4 +149,7 @@ public class MetadataResponse { return this.cluster; } + public static MetadataResponse parse(ByteBuffer buffer) { + return new MetadataResponse(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java new file mode 100644 index 0000000..3ee5cba --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This wrapper supports both v0 and v1 of OffsetCommitRequest. + */ +public class OffsetCommitRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); + private static String GROUP_ID_KEY_NAME = "group_id"; + private static String GENERATION_ID_KEY_NAME = "group_generation_id"; + private static String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static String TOPICS_KEY_NAME = "topics"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partitions"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String COMMIT_OFFSET_KEY_NAME = "offset"; + private static String TIMESTAMP_KEY_NAME = "timestamp"; + private static String METADATA_KEY_NAME = "metadata"; + + public static final int DEFAULT_GENERATION_ID = -1; + public static final String DEFAULT_CONSUMER_ID = ""; + + private final String groupId; + private final int generationId; + private final String consumerId; + private final Map offsetData; + + public static final class PartitionData { + public final long offset; + public final long timestamp; + public final String metadata; + + public PartitionData(long offset, long timestamp, String metadata) { + this.offset = offset; + this.timestamp = timestamp; + this.metadata = metadata; + } + } + + /** + * Constructor for version 0. + * @param groupId + * @param offsetData + */ + @Deprecated + public OffsetCommitRequest(String groupId, Map offsetData) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0))); + initCommonFields(groupId, offsetData); + this.groupId = groupId; + this.generationId = DEFAULT_GENERATION_ID; + this.consumerId = DEFAULT_CONSUMER_ID; + this.offsetData = offsetData; + } + + /** + * Constructor for version 1. + * @param groupId + * @param generationId + * @param consumerId + * @param offsetData + */ + public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map offsetData) { + super(new Struct(curSchema)); + + initCommonFields(groupId, offsetData); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + this.groupId = groupId; + this.generationId = generationId; + this.consumerId = consumerId; + this.offsetData = offsetData; + } + + private void initCommonFields(String groupId, Map offsetData) { + Map> topicsData = CollectionUtils.groupDataByTopic(offsetData); + + struct.set(GROUP_ID_KEY_NAME, groupId); + List topicArray = new ArrayList(); + for (Map.Entry> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + } + + public OffsetCommitRequest(Struct struct) { + super(struct); + offsetData = new HashMap(); + for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); + long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + String metadata = partitionResponse.getString(METADATA_KEY_NAME); + PartitionData partitionData = new PartitionData(offset, timestamp, metadata); + offsetData.put(new TopicPartition(topic, partition), partitionData); + } + } + groupId = struct.getString(GROUP_ID_KEY_NAME); + // This field only exists in v1. + if (struct.hasField(GENERATION_ID_KEY_NAME)) + generationId = struct.getInt(GENERATION_ID_KEY_NAME); + else + generationId = DEFAULT_GENERATION_ID; + + // This field only exists in v1. + if (struct.hasField(CONSUMER_ID_KEY_NAME)) + consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + else + consumerId = DEFAULT_CONSUMER_ID; + } + + public String groupId() { + return groupId; + } + + public int generationId() { + return generationId; + } + + public String consumerId() { + return consumerId; + } + + public Map offsetData() { + return offsetData; + } + + public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) { + Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId); + return new OffsetCommitRequest(((Struct) schema.read(buffer))); + } + + public static OffsetCommitRequest parse(ByteBuffer buffer) { + return new OffsetCommitRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java new file mode 100644 index 0000000..711232a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OffsetCommitResponse extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id); + private static String RESPONSES_KEY_NAME = "responses"; + + // topic level fields + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partition_responses"; + + // partition level fields + private static String PARTITION_KEY_NAME = "partition"; + private static String ERROR_CODE_KEY_NAME = "error_code"; + + private final Map responseData; + + public OffsetCommitResponse(Map responseData) { + super(new Struct(curSchema)); + + Map> topicsData = CollectionUtils.groupDataByTopic(responseData); + + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : entries.getValue().entrySet()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue()); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + this.responseData = responseData; + } + + public OffsetCommitResponse(Struct struct) { + super(struct); + responseData = new HashMap(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + responseData.put(new TopicPartition(topic, partition), errorCode); + } + } + } + + public Map responseData() { + return responseData; + } + + public static OffsetCommitResponse parse(ByteBuffer buffer) { + return new OffsetCommitResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java new file mode 100644 index 0000000..90d5135 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * This wrapper supports both v0 and v1 of OffsetFetchRequest. + */ +public class OffsetFetchRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id); + private static String GROUP_ID_KEY_NAME = "group_id"; + private static String TOPICS_KEY_NAME = "topics"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partitions"; + + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + + public static final int DEFAULT_GENERATION_ID = -1; + public static final String DEFAULT_CONSUMER_ID = ""; + + private final String groupId; + private final List partitions; + + public OffsetFetchRequest(String groupId, List partitions) { + super(new Struct(curSchema)); + + Map> topicsData = CollectionUtils.groupDataByTopic(partitions); + + struct.set(GROUP_ID_KEY_NAME, groupId); + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + List partitionArray = new ArrayList(); + for (Integer partiitonId : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partiitonId); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + this.groupId = groupId; + this.partitions = partitions; + } + + public OffsetFetchRequest(Struct struct) { + super(struct); + partitions = new ArrayList(); + for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + partitions.add(new TopicPartition(topic, partition)); + } + } + groupId = struct.getString(GROUP_ID_KEY_NAME); + } + + public String groupId() { + return groupId; + } + + public List partitions() { + return partitions; + } + + public static OffsetFetchRequest parse(ByteBuffer buffer) { + return new OffsetFetchRequest(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java new file mode 100644 index 0000000..6b7c269 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OffsetFetchResponse extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id); + private static String RESPONSES_KEY_NAME = "responses"; + + // topic level fields + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITIONS_KEY_NAME = "partition_responses"; + + // partition level fields + private static String PARTITION_KEY_NAME = "partition"; + private static String COMMIT_OFFSET_KEY_NAME = "offset"; + private static String METADATA_KEY_NAME = "metadata"; + private static String ERROR_CODE_KEY_NAME = "error_code"; + + private final Map responseData; + + public static final class PartitionData { + public final long offset; + public final String metadata; + public final short errorCode; + + public PartitionData(long offset, String metadata, short errorCode) { + this.offset = offset; + this.metadata = metadata; + this.errorCode = errorCode; + } + } + + public OffsetFetchResponse(Map responseData) { + super(new Struct(curSchema)); + + Map> topicsData = CollectionUtils.groupDataByTopic(responseData); + + List topicArray = new ArrayList(); + for (Map.Entry> entries: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : entries.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); + partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); + partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + this.responseData = responseData; + } + + public OffsetFetchResponse(Struct struct) { + super(struct); + responseData = new HashMap(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); + String metadata = partitionResponse.getString(METADATA_KEY_NAME); + short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + PartitionData partitionData = new PartitionData(offset, metadata, errorCode); + responseData.put(new TopicPartition(topic, partition), partitionData); + } + } + } + + public Map responseData() { + return responseData; + } + + public static OffsetFetchResponse parse(ByteBuffer buffer) { + return new OffsetFetchResponse(((Struct) curSchema.read(buffer))); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 6036f6a..3dbba8a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -1,71 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package org.apache.kafka.common.requests; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.record.MemoryRecords; +public class ProduceRequest extends AbstractRequestResponse { + public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); + private static String ACKS_KEY_NAME = "acks"; + private static String TIMEOUT_KEY_NAME = "timeout"; + private static String TOPIC_DATA_KEY_NAME = "topic_data"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITION_DATA_KEY_NAME = "data"; -public class ProduceRequest { + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String RECORD_SET_KEY_NAME = "record_set"; private final short acks; private final int timeout; - private final Map> records; + private final Map partitionRecords; - public ProduceRequest(short acks, int timeout) { + public ProduceRequest(short acks, int timeout, Map partitionRecords) { + super(new Struct(curSchema)); + Map> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); + struct.set(ACKS_KEY_NAME, acks); + struct.set(TIMEOUT_KEY_NAME, timeout); + List topicDatas = new ArrayList(recordsByTopic.size()); + for (Map.Entry> entry : recordsByTopic.entrySet()) { + Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : entry.getValue().entrySet()) { + ByteBuffer buffer = partitionEntry.getValue().duplicate(); + Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) + .set(PARTITION_KEY_NAME, partitionEntry.getKey()) + .set(RECORD_SET_KEY_NAME, buffer); + partitionArray.add(part); + } + topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); + topicDatas.add(topicData); + } + struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray()); this.acks = acks; this.timeout = timeout; - this.records = new HashMap>(); + this.partitionRecords = partitionRecords; } - public void add(TopicPartition tp, MemoryRecords recs) { - List found = this.records.get(tp.topic()); - if (found == null) { - found = new ArrayList(); - records.put(tp.topic(), found); + public ProduceRequest(Struct struct) { + super(struct); + partitionRecords = new HashMap(); + for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { + Struct topicData = (Struct) topicDataObj; + String topic = topicData.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME); + partitionRecords.put(new TopicPartition(topic, partition), records); + } } - found.add(new PartitionRecords(tp, recs)); + acks = struct.getShort(ACKS_KEY_NAME); + timeout = struct.getInt(TIMEOUT_KEY_NAME); } - public Struct toStruct() { - Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id)); - produce.set("acks", acks); - produce.set("timeout", timeout); - List topicDatas = new ArrayList(records.size()); - for (Map.Entry> entry : records.entrySet()) { - Struct topicData = produce.instance("topic_data"); - topicData.set("topic", entry.getKey()); - List parts = entry.getValue(); - Object[] partitionData = new Object[parts.size()]; - for (int i = 0; i < parts.size(); i++) { - ByteBuffer buffer = parts.get(i).records.buffer(); - buffer.flip(); - Struct part = topicData.instance("data") - .set("partition", parts.get(i).topicPartition.partition()) - .set("record_set", buffer); - partitionData[i] = part; - } - topicData.set("data", partitionData); - topicDatas.add(topicData); - } - produce.set("topic_data", topicDatas.toArray()); - return produce; + public short acks() { + return acks; } - private static final class PartitionRecords { - public final TopicPartition topicPartition; - public final MemoryRecords records; + public int timeout() { + return timeout; + } - public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) { - this.topicPartition = topicPartition; - this.records = records; - } + public Map partitionRecords() { + return partitionRecords; } + public static ProduceRequest parse(ByteBuffer buffer) { + return new ProduceRequest(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6cf4fb7..5220464 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -12,67 +12,83 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.Struct; +public class ProduceResponse extends AbstractRequestResponse { + private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id); + private static String RESPONSES_KEY_NAME = "responses"; + + // topic level field names + private static String TOPIC_KEY_NAME = "topic"; + private static String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; -public class ProduceResponse { + // partition level field names + private static String PARTITION_KEY_NAME = "partition"; + private static String ERROR_CODE_KEY_NAME = "error_code"; + private static String BASE_OFFSET_KEY_NAME = "base_offset"; private final Map responses; - public ProduceResponse() { - this.responses = new HashMap(); + public ProduceResponse(Map responses) { + super(new Struct(curSchema)); + Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); + List topicDatas = new ArrayList(responseByTopic.size()); + for (Map.Entry> entry : responseByTopic.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entry.getKey()); + List partitionArray = new ArrayList(); + for (Map.Entry partitionEntry : entry.getValue().entrySet()) { + PartitionResponse part = partitionEntry.getValue(); + Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) + .set(PARTITION_KEY_NAME, partitionEntry.getKey()) + .set(ERROR_CODE_KEY_NAME, part.errorCode) + .set(BASE_OFFSET_KEY_NAME, part.baseOffset); + partitionArray.add(partStruct); + } + topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); + topicDatas.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + this.responses = responses; } public ProduceResponse(Struct struct) { + super(struct); responses = new HashMap(); - for (Object topicResponse : (Object[]) struct.get("responses")) { + for (Object topicResponse : struct.getArray("responses")) { Struct topicRespStruct = (Struct) topicResponse; - String topic = (String) topicRespStruct.get("topic"); - for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { + String topic = topicRespStruct.getString("topic"); + for (Object partResponse : topicRespStruct.getArray("partition_responses")) { Struct partRespStruct = (Struct) partResponse; - int partition = (Integer) partRespStruct.get("partition"); - short errorCode = (Short) partRespStruct.get("error_code"); - long offset = (Long) partRespStruct.get("base_offset"); + int partition = partRespStruct.getInt("partition"); + short errorCode = partRespStruct.getShort("error_code"); + long offset = partRespStruct.getLong("base_offset"); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(partition, errorCode, offset)); + responses.put(tp, new PartitionResponse(errorCode, offset)); } } } - public void addResponse(TopicPartition tp, int partition, short error, long baseOffset) { - this.responses.put(tp, new PartitionResponse(partition, error, baseOffset)); - } - public Map responses() { return this.responses; } - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append('{'); - boolean isFirst = true; - for (Map.Entry entry : responses.entrySet()) { - if (isFirst) - isFirst = false; - else - b.append(','); - b.append(entry.getKey() + " : " + entry.getValue()); - } - b.append('}'); - return b.toString(); - } - - public static class PartitionResponse { - public int partitionId; + public static final class PartitionResponse { public short errorCode; public long baseOffset; - public PartitionResponse(int partitionId, short errorCode, long baseOffset) { - this.partitionId = partitionId; + public PartitionResponse(short errorCode, long baseOffset) { this.errorCode = errorCode; this.baseOffset = baseOffset; } @@ -81,9 +97,7 @@ public class ProduceResponse { public String toString() { StringBuilder b = new StringBuilder(); b.append('{'); - b.append("pid: "); - b.append(partitionId); - b.append(",error: "); + b.append("error: "); b.append(errorCode); b.append(",offset: "); b.append(baseOffset); @@ -91,4 +105,8 @@ public class ProduceResponse { return b.toString(); } } + + public static ProduceResponse parse(ByteBuffer buffer) { + return new ProduceResponse(((Struct) curSchema.read(buffer))); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 66cc2fe..f459a2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -24,18 +24,24 @@ import org.apache.kafka.common.protocol.types.Struct; /** * The header for a request in the Kafka protocol */ -public class RequestHeader { +public class RequestHeader extends AbstractRequestResponse { private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key"); private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version"); private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id"); private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id"); - private final Struct header; + private final short apiKey; + private final short apiVersion; + private final String clientId; + private final int correlationId; public RequestHeader(Struct header) { - super(); - this.header = header; + super(header); + apiKey = struct.getShort(API_KEY_FIELD); + apiVersion = struct.getShort(API_VERSION_FIELD); + clientId = struct.getString(CLIENT_ID_FIELD); + correlationId = struct.getInt(CORRELATION_ID_FIELD); } public RequestHeader(short apiKey, String client, int correlation) { @@ -43,43 +49,34 @@ public class RequestHeader { } public RequestHeader(short apiKey, short version, String client, int correlation) { - this(new Struct(Protocol.REQUEST_HEADER)); - this.header.set(API_KEY_FIELD, apiKey); - this.header.set(API_VERSION_FIELD, version); - this.header.set(CLIENT_ID_FIELD, client); - this.header.set(CORRELATION_ID_FIELD, correlation); + super(new Struct(Protocol.REQUEST_HEADER)); + struct.set(API_KEY_FIELD, apiKey); + struct.set(API_VERSION_FIELD, version); + struct.set(CLIENT_ID_FIELD, client); + struct.set(CORRELATION_ID_FIELD, correlation); + this.apiKey = apiKey; + this.apiVersion = version; + this.clientId = client; + this.correlationId = correlation; } public short apiKey() { - return (Short) this.header.get(API_KEY_FIELD); + return apiKey; } public short apiVersion() { - return (Short) this.header.get(API_VERSION_FIELD); + return apiVersion; } public String clientId() { - return (String) this.header.get(CLIENT_ID_FIELD); + return clientId; } public int correlationId() { - return (Integer) this.header.get(CORRELATION_ID_FIELD); + return correlationId; } public static RequestHeader parse(ByteBuffer buffer) { return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer)); } - - public void writeTo(ByteBuffer buffer) { - header.writeTo(buffer); - } - - public int sizeOf() { - return header.sizeOf(); - } - - @Override - public String toString() { - return header.toString(); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java index 257b828..dd63853 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java @@ -28,31 +28,25 @@ import org.apache.kafka.common.protocol.types.Struct; /** * A response header in the kafka protocol. */ -public class ResponseHeader { +public class ResponseHeader extends AbstractRequestResponse { private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id"); - private final Struct header; + private final int correlationId; public ResponseHeader(Struct header) { - this.header = header; + super(header); + correlationId = struct.getInt(CORRELATION_KEY_FIELD); } public ResponseHeader(int correlationId) { - this(new Struct(Protocol.RESPONSE_HEADER)); - this.header.set(CORRELATION_KEY_FIELD, correlationId); + super(new Struct(Protocol.RESPONSE_HEADER)); + struct.set(CORRELATION_KEY_FIELD, correlationId); + this.correlationId = correlationId; } public int correlationId() { - return (Integer) header.get(CORRELATION_KEY_FIELD); - } - - public void writeTo(ByteBuffer buffer) { - header.writeTo(buffer); - } - - public int sizeOf() { - return header.sizeOf(); + return correlationId; } public static ResponseHeader parse(ByteBuffer buffer) { diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java new file mode 100644 index 0000000..ba38637 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CollectionUtils { + /** + * group data by topic + * @param data Data to be partitioned + * @param Partition data type + * @return partitioned data + */ + public static Map> groupDataByTopic(Map data) { + Map> dataByTopic = new HashMap>(); + for (Map.Entry entry: data.entrySet()) { + String topic = entry.getKey().topic(); + int partition = entry.getKey().partition(); + Map topicData = dataByTopic.get(topic); + if (topicData == null) { + topicData = new HashMap(); + dataByTopic.put(topic, topicData); + } + topicData.put(partition, entry.getValue()); + } + return dataByTopic; + } + + /** + * group partitions by topic + * @param partitions + * @return partitions per topic + */ + public static Map> groupDataByTopic(List partitions) { + Map> partitionsByTopic = new HashMap>(); + for (TopicPartition tp: partitions) { + String topic = tp.topic(); + List topicData = partitionsByTopic.get(topic); + if (topicData == null) { + topicData = new ArrayList(); + partitionsByTopic.put(topic, topicData); + } + topicData.add(tp.partition()); + } + return partitionsByTopic; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 2f98192..1a55242 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -7,11 +7,13 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; @@ -68,7 +70,7 @@ public class NetworkClientTest { @Test public void testSimpleRequestResponse() { - ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000); + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java new file mode 100644 index 0000000..df37fc6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RequestResponseTest { + + @Test + public void testSerialization() throws Exception{ + List requestList = Arrays.asList( + createRequestHeader(), + createResponseHeader(), + createConsumerMetadataRequest(), + createConsumerMetadataResponse(), + createFetchRequest(), + createFetchResponse(), + createHeartBeatRequest(), + createHeartBeatResponse(), + createJoinGroupRequest(), + createJoinGroupResponse(), + createListOffsetRequest(), + createListOffsetResponse(), + createMetadataRequest(), + createMetadataResponse(), + createOffsetCommitRequest(), + createOffsetCommitResponse(), + createOffsetFetchRequest(), + createOffsetFetchResponse(), + createProduceRequest(), + createProduceResponse()); + + for (AbstractRequestResponse req: requestList) { + ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf()); + req.writeTo(buffer); + buffer.rewind(); + Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class); + AbstractRequestResponse deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer); + assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized); + assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.", + req.hashCode(), deserialized.hashCode()); + } + } + + private AbstractRequestResponse createRequestHeader() { + return new RequestHeader((short)10, (short)1, "", 10); + } + + private AbstractRequestResponse createResponseHeader() { + return new ResponseHeader(10); + } + + private AbstractRequestResponse createConsumerMetadataRequest() { + return new ConsumerMetadataRequest("test-group"); + } + + private AbstractRequestResponse createConsumerMetadataResponse() { + return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014)); + } + + private AbstractRequestResponse createFetchRequest() { + Map fetchData = new HashMap(); + fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000)); + fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000)); + return new FetchRequest(-1, 100, 100000, fetchData); + } + + private AbstractRequestResponse createFetchResponse() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10))); + return new FetchResponse(responseData); + } + + private AbstractRequestResponse createHeartBeatRequest() { + return new HeartbeatRequest("group1", 1, "consumer1"); + } + + private AbstractRequestResponse createHeartBeatResponse() { + return new HeartbeatResponse((short)0); + } + + private AbstractRequestResponse createJoinGroupRequest() { + return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1"); + } + + private AbstractRequestResponse createJoinGroupResponse() { + return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1))); + } + + private AbstractRequestResponse createListOffsetRequest() { + Map offsetData = new HashMap(); + offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); + return new ListOffsetRequest(-1, offsetData); + } + + private AbstractRequestResponse createListOffsetResponse() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L))); + return new ListOffsetResponse(responseData); + } + + private AbstractRequestResponse createMetadataRequest() { + return new MetadataRequest(Arrays.asList("topic1")); + } + + private AbstractRequestResponse createMetadataResponse() { + Node node = new Node(1, "host1", 1001); + Node[] replicas = new Node[1]; + replicas[0] = node; + Node[] isr = new Node[1]; + isr[0] = node; + Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr))); + return new MetadataResponse(cluster); + } + + private AbstractRequestResponse createOffsetCommitRequest() { + Map commitData = new HashMap(); + commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, "")); + return new OffsetCommitRequest("group1", 100, "consumer1", commitData); + } + + private AbstractRequestResponse createOffsetCommitResponse() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), (short)0); + return new OffsetCommitResponse(responseData); + } + + private AbstractRequestResponse createOffsetFetchRequest() { + return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1))); + } + + private AbstractRequestResponse createOffsetFetchResponse() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0)); + return new OffsetFetchResponse(responseData); + } + + private AbstractRequestResponse createProduceRequest() { + Map produceData = new HashMap(); + produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); + return new ProduceRequest((short)0, 5000, produceData); + } + + private AbstractRequestResponse createProduceResponse() { + Map responseData = new HashMap(); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000)); + return new ProduceResponse(responseData); + } +} diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala index dfad6e6..6d00ed0 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala @@ -41,9 +41,9 @@ object ConsumerMetadataRequest { case class ConsumerMetadataRequest(group: String, versionId: Short = ConsumerMetadataRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = ConsumerMetadataRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) { def sizeInBytes = 2 + /* versionId */ diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala index c72ca14..84f6017 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala @@ -40,8 +40,8 @@ object ConsumerMetadataResponse { } -case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0) - extends RequestOrResponse(correlationId = correlationId) { +case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0) + extends RequestOrResponse() { def sizeInBytes = 4 + /* correlationId */ diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 7dacb20..5be393a 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -38,9 +38,9 @@ object ControlledShutdownRequest extends Logging { } case class ControlledShutdownRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val brokerId: Int) - extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){ + extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){ def this(correlationId: Int, brokerId: Int) = this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId) diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index 46ec3db..5e0a1cf 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -39,10 +39,10 @@ object ControlledShutdownResponse { } -case class ControlledShutdownResponse(override val correlationId: Int, +case class ControlledShutdownResponse(val correlationId: Int, val errorCode: Short = ErrorMapping.NoError, val partitionsRemaining: Set[TopicAndPartition]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a8b73ac..55a5982 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -60,13 +60,13 @@ object FetchRequest { } case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, - override val correlationId: Int = FetchRequest.DefaultCorrelationId, + correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = ConsumerConfig.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) - extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.FetchKey)) { /** * Partitions the request info into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala new file mode 100644 index 0000000..fb022e8 --- /dev/null +++ b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import org.apache.kafka.common.requests.AbstractRequestResponse + +private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse, + val body: AbstractRequestResponse, + val name: String, + override val requestId: Option[Short] = None) + extends RequestOrResponse(requestId) { + + def writeTo(buffer: ByteBuffer) { + header.writeTo(buffer) + body.writeTo(buffer) + } + + def sizeInBytes(): Int = { + header.sizeOf() + body.sizeOf(); + } + + override def toString(): String = { + describe(true) + } + + override def describe(details: Boolean): String = { + val strBuffer = new StringBuilder + strBuffer.append("Name: " + name) + strBuffer.append("; header: " + header.toString) + strBuffer.append("; body: " + body.toString) + strBuffer.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala new file mode 100644 index 0000000..932418b --- /dev/null +++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import kafka.network.RequestChannel.Response +import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader} + +object HeartbeatRequestAndHeader { + def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { + val header = RequestHeader.parse(buffer) + val body = HeartbeatRequest.parse(buffer) + new HeartbeatRequestAndHeader(header, body) + } +} + +case class HeartbeatRequestAndHeader(override val header: RequestHeader, override val body: HeartbeatRequest) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponseHeader = new ResponseHeader(header.correlationId) + val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) + } +} diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala new file mode 100644 index 0000000..556f38d --- /dev/null +++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse} +import java.nio.ByteBuffer + +object HeartbeatResponseAndHeader { + def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { + val header = ResponseHeader.parse(buffer) + val body = HeartbeatResponse.parse(buffer) + new HeartbeatResponseAndHeader(header, body) + } +} + +case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { +} diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala new file mode 100644 index 0000000..9aea28c --- /dev/null +++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.ErrorMapping +import org.apache.kafka.common.requests._ +import kafka.network.RequestChannel.Response +import scala.Some + +object JoinGroupRequestAndHeader { + def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { + val header = RequestHeader.parse(buffer) + val body = JoinGroupRequest.parse(buffer) + new JoinGroupRequestAndHeader(header, body) + } +} + +case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponseHeader = new ResponseHeader(header.correlationId) + val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) + } +} diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala new file mode 100644 index 0000000..7389ae6 --- /dev/null +++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader} +import java.nio.ByteBuffer + +object JoinGroupResponseAndHeader { + def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { + val header = ResponseHeader.parse(buffer) + val body = JoinGroupResponse.parse(buffer) + new JoinGroupResponseAndHeader(header, body) + } +} + +case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse) + extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { +} diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3e40817..4ff7e8f 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -129,13 +129,13 @@ object LeaderAndIsrRequest { } case class LeaderAndIsrRequest (versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker]) - extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index f636444..22ce48a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -41,10 +41,10 @@ object LeaderAndIsrResponse { } -case class LeaderAndIsrResponse(override val correlationId: Int, +case class LeaderAndIsrResponse(correlationId: Int, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 630768a..861a6cf 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Response import scala.collection._ object OffsetCommitRequest extends Logging { - val CurrentVersion: Short = 0 + val CurrentVersion: Short = 1 val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { @@ -34,11 +34,23 @@ object OffsetCommitRequest extends Logging { // Read values from the envelope val versionId = buffer.getShort + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") + val correlationId = buffer.getInt val clientId = readShortString(buffer) // Read the OffsetRequest val consumerGroupId = readShortString(buffer) + + // version 1 specific fields + var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID + var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID + if (versionId == 1) { + groupGenerationId = buffer.getInt + consumerId = readShortString(buffer) + } + val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topic = readShortString(buffer) @@ -54,16 +66,20 @@ object OffsetCommitRequest extends Logging { (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) }) }) - OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId) + OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId) } } case class OffsetCommitRequest(groupId: String, requestInfo: immutable.Map[TopicAndPartition, OffsetAndMetadata], versionId: Short = OffsetCommitRequest.CurrentVersion, - override val correlationId: Int = 0, - clientId: String = OffsetCommitRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) { + correlationId: Int = 0, + clientId: String = OffsetCommitRequest.DefaultClientId, + groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, + consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID) + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) @@ -84,7 +100,6 @@ case class OffsetCommitRequest(groupId: String, OffsetCommitResponse(commitStatus, correlationId) } - def writeTo(buffer: ByteBuffer) { // Write envelope buffer.putShort(versionId) @@ -93,6 +108,12 @@ case class OffsetCommitRequest(groupId: String, // Write OffsetCommitRequest writeShortString(buffer, groupId) // consumer group + + // version 1 specific data + if (versionId == 1) { + buffer.putInt(groupGenerationId) + writeShortString(buffer, consumerId) + } buffer.putInt(requestInfoGroupedByTopic.size) // number of topics requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError] writeShortString(buffer, t1._1) // topic @@ -110,7 +131,8 @@ case class OffsetCommitRequest(groupId: String, 2 + /* versionId */ 4 + /* correlationId */ shortStringLength(clientId) + - shortStringLength(groupId) + + shortStringLength(groupId) + + (if (versionId == 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets @@ -139,6 +161,8 @@ case class OffsetCommitRequest(groupId: String, offsetCommitRequest.append("; CorrelationId: " + correlationId) offsetCommitRequest.append("; ClientId: " + clientId) offsetCommitRequest.append("; GroupId: " + groupId) + offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId) + offsetCommitRequest.append("; ConsumerId: " + consumerId) if(details) offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(",")) offsetCommitRequest.toString() diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 4946e97..624a1c1 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -42,8 +42,8 @@ object OffsetCommitResponse extends Logging { } case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], - override val correlationId: Int = 0) - extends RequestOrResponse(correlationId=correlationId) { + correlationId: Int = 0) + extends RequestOrResponse() { lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index a32f858..c7604b9 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -52,9 +52,9 @@ object OffsetFetchRequest extends Logging { case class OffsetFetchRequest(groupId: String, requestInfo: Seq[TopicAndPartition], versionId: Short = OffsetFetchRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = OffsetFetchRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic) diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index c1222f4..e3523f8 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -45,8 +45,8 @@ object OffsetFetchResponse extends Logging { } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - override val correlationId: Int = 0) - extends RequestOrResponse(correlationId = correlationId) { + correlationId: Int = 0) + extends RequestOrResponse() { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 7cbc26c..3d483bc 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], versionId: Short = OffsetRequest.CurrentVersion, - override val correlationId: Int = 0, + correlationId: Int = 0, clientId: String = OffsetRequest.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId) - extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 0e1d6e3..63c0899 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -51,9 +51,9 @@ case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) { } -case class OffsetResponse(override val correlationId: Int, +case class OffsetResponse(correlationId: Int, partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 0c295a2..b2366e7 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -53,12 +53,12 @@ object ProducerRequest { } case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, - override val correlationId: Int, + correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) - extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { /** * Partitions the data into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5a1d801..a286272 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,9 +43,9 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(override val correlationId: Int, +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { /** * Partitions the status map into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index fbfc9d3..c24c034 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -32,6 +32,8 @@ object RequestKeys { val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 val ConsumerMetadataKey: Short = 10 + val JoinGroupKey: Short = 11 + val HeartbeatKey: Short = 12 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -44,7 +46,10 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), + JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), + HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + ) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 57f87a4..0334343 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -30,7 +30,7 @@ object Request { } -private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging { +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging { def sizeInBytes: Int diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 68fc138..5e14987 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -54,13 +54,13 @@ object StopReplicaRequest extends Logging { } case class StopReplicaRequest(versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, deletePartitions: Boolean, partitions: Set[TopicAndPartition]) - extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = { this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index c90ddee..3431f3f 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -42,10 +42,10 @@ object StopReplicaResponse { } -case class StopReplicaResponse(override val correlationId: Int, +case class StopReplicaResponse(val correlationId: Int, val responseMap: Map[TopicAndPartition, Short], val errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index a319f2f..dcd0c68 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging { } case class TopicMetadataRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val clientId: String, val topics: Seq[String]) - extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){ + extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String], correlationId: Int) = this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index f6b7429..f65b57b 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -34,8 +34,8 @@ object TopicMetadataResponse { } case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], - override val correlationId: Int) - extends RequestOrResponse(correlationId = correlationId) { + correlationId: Int) + extends RequestOrResponse() { val sizeInBytes: Int = { val brokers = extractBrokers(topicsMetadata).values 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 543e262..530982e 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -55,13 +55,13 @@ object UpdateMetadataRequest { } case class UpdateMetadataRequest (versionId: Short, - override val correlationId: Int, + correlationId: Int, clientId: String, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) - extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey), correlationId) { + extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey)) { def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String, partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = { diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala index c583c1f..53f6067 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala @@ -32,9 +32,9 @@ object UpdateMetadataResponse { } } -case class UpdateMetadataResponse(override val correlationId: Int, +case class UpdateMetadataResponse(correlationId: Int, errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse(correlationId = correlationId) { + extends RequestOrResponse() { def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */ def writeTo(buffer: ByteBuffer) { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8763968..ecbfa0f 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -133,9 +133,9 @@ class RequestSendThread(val controllerId: Int, isSendSuccessful = true } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message - error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " + + error(("Controller %d epoch %d failed to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, - RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e) + request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false @@ -153,8 +153,8 @@ class RequestSendThread(val controllerId: Int, case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) } - stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString())) + stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" + .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) if(callback != null) { callback(response) diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 08dcc55..27fc1eb 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -21,7 +21,6 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition} class OffsetCommitRequest(groupId: String, requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata], - versionId: Short, correlationId: Int, clientId: String) { val underlying = { @@ -33,7 +32,6 @@ class OffsetCommitRequest(groupId: String, kafka.api.OffsetCommitRequest( groupId = groupId, requestInfo = scalaMap, - versionId = versionId, correlationId = correlationId, clientId = clientId ) diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 7e6da16..b0b7be1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -24,10 +24,10 @@ import kafka.common.ErrorMapping import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, - override val correlationId: Int, + val correlationId: Int, val clientId: String, val topics: java.util.List[String]) - extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { + extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = { import scala.collection.JavaConversions._ diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a2117b3..305c2ad 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -23,9 +23,14 @@ import junit.framework.Assert._ import java.nio.ByteBuffer import kafka.message.{Message, ByteBufferMessageSet} import kafka.cluster.Broker -import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping, OffsetMetadataAndError} -import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.utils.SystemTime +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.protocol.ApiKeys +import scala.Some +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition object SerializationTestUtils { @@ -146,13 +151,23 @@ object SerializationTestUtils { new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1) } - def createTestOffsetCommitRequest: OffsetCommitRequest = { + def createTestOffsetCommitRequestV1: OffsetCommitRequest = { new OffsetCommitRequest("group 1", collection.immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) )) } + def createTestOffsetCommitRequestV0: OffsetCommitRequest = { + new OffsetCommitRequest( + versionId = 0, + groupId = "group 1", + requestInfo = collection.immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) + )) + } + def createTestOffsetCommitResponse: OffsetCommitResponse = { new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> ErrorMapping.NoError, TopicAndPartition(topic1, 1) -> ErrorMapping.NoError)) @@ -180,6 +195,31 @@ object SerializationTestUtils { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError) } + def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { + val header = new RequestHeader(ApiKeys.HEARTBEAT.id, 0.asInstanceOf[Short], "", 1) + val body = new HeartbeatRequest("group1", 1, "consumer1") + HeartbeatRequestAndHeader(header, body) + } + + def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { + val header = new ResponseHeader(1) + val body = new HeartbeatResponse(0.asInstanceOf[Short]) + HeartbeatResponseAndHeader(header, body) + } + + def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { + import scala.collection.JavaConversions._ + val header = new RequestHeader(ApiKeys.JOIN_GROUP.id, 0.asInstanceOf[Short], "", 1) + val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); + JoinGroupRequestAndHeader(header, body) + } + + def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { + import scala.collection.JavaConversions._ + val header = new ResponseHeader(1) + val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) + JoinGroupResponseAndHeader(header, body) + } } class RequestResponseSerializationTest extends JUnitSuite { @@ -194,27 +234,31 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetResponse = SerializationTestUtils.createTestOffsetResponse private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse - private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest + private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0 + private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1 private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode) + private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader + private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader + private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader + private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader @Test def testSerializationAndDeserialization() { val requestsAndResponses = - collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, - stopReplicaRequest, stopReplicaResponse, - producerRequest, producerResponse, - fetchRequest, - offsetRequest, offsetResponse, - topicMetadataRequest, topicMetadataResponse, - offsetCommitRequest, offsetCommitResponse, - offsetFetchRequest, offsetFetchResponse, - consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator) + collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest, + stopReplicaResponse, producerRequest, producerResponse, + fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest, + topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, + offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, + consumerMetadataRequest, consumerMetadataResponse, + consumerMetadataResponseNoCoordinator, heartbeatRequest, + heartbeatResponse, joinGroupRequest, joinGroupResponse) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes) @@ -222,7 +266,9 @@ class RequestResponseSerializationTest extends JUnitSuite { buffer.rewind() val deserializer = original.getClass.getDeclaredMethod("readFrom", classOf[ByteBuffer]) val deserialized = deserializer.invoke(null, buffer) - assertEquals("The original and deserialized request/response should be the same.", original, deserialized) + assertFalse("All serialized bytes in " + original.getClass.getSimpleName + " should have been consumed", + buffer.hasRemaining) + assertEquals("The original and deserialized for " + original.getClass.getSimpleName + " should be the same.", original, deserialized) } } }