diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..48987bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -17,9 +17,6 @@ package org.apache.kafka.common.protocol; -import java.util.ArrayList; -import java.util.List; - /** * Identifiers for all the Kafka APIs */ @@ -31,7 +28,10 @@ public enum ApiKeys { LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + OFFSET_FETCH(7, "offset_fetch"), + + TX(11, "transaction"), + TX_COORDINATOR(12, "tx_coordinator"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3374bd9..efc73f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TransactionCoordinatorNotAvailableException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; /** * This class contains all the client-server errors--those errors that must be sent from the server to the client. These * are thus part of the protocol. The names can be changed but the error code cannot. - * + * * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { @@ -51,7 +52,8 @@ public enum Errors { // TODO: errorCode 8, 9, 11 MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), - NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); + NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), + TRANSACTION_COORDINATOR_NOT_AVAILABLE(17, new TransactionCoordinatorNotAvailableException("There is no tx coordinator available to server transaction requests.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..df8642f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -76,8 +76,17 @@ public class Protocol { "Host and port information for all brokers."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0))); + public static Schema TX_COORDINATOR_REQUEST_V0 = new Schema(new Field("groupId", + STRING, + "A user specified identifier for the transaction group")); + + public static Schema TX_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error", INT16, "Error code, if any"), + new Field("broker", BROKER, "The broker information")); + public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 }; public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 }; + public static Schema[] TX_COORDINATOR_METADATA_REQUEST = new Schema[] { TX_COORDINATOR_REQUEST_V0 }; + public static Schema[] TX_COORDINATOR_METADATA_RESPONSE = new Schema[] { TX_COORDINATOR_RESPONSE_V0 }; /* Produce api */ @@ -100,10 +109,26 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; + /* Transaction api */ + + public static Schema TX_REQUEST_V0 = new Schema(new Field("ackTimeoutMs", INT32, "Time to await a response"), + new Field("groupId", STRING, "transaction group Id"), + new Field("txId", INT32, "System generated unique identifier for transaction"), + new Field("txControl", INT16, "Field to indicate type of transaction request"), + new Field("txRequestTimeoutMs", INT32, "Timeout for this transaction (i.e. the transaction should finish within this time)"), + new Field("txPartitions", new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition", new ArrayOf(INT32)))))); + + public static Schema TX_RESPONSE_V0 = new Schema(new Field("txId", INT32, "System generated unique identifier for transaction"), + new Field("error", INT16, "Identify error")); + + + public static Schema[] TRANSACTION_REQUEST = new Schema[] { TX_REQUEST_V0 }; + public static Schema[] TRANSACTION_RESPONSE = new Schema[] { TX_RESPONSE_V0 }; + /* an array of all requests and responses with all schema versions */ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -120,6 +145,8 @@ public class Protocol { REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.TX.id] = TRANSACTION_REQUEST; + REQUESTS[ApiKeys.TX_COORDINATOR.id] = TX_COORDINATOR_METADATA_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; @@ -129,6 +156,8 @@ public class Protocol { RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.TX.id] = TRANSACTION_RESPONSE; + RESPONSES[ApiKeys.TX_COORDINATOR.id] = TX_COORDINATOR_METADATA_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java new file mode 100644 index 0000000..cbb6a55 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.Map; + +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class OffsetCommitRequest { + + private final int groupId; + private final Map offsets; + + public OffsetCommitRequest(int groupId, Map offsets) { + this.groupId = groupId; + this.offsets = offsets; + } + + public Struct toStruct() { + Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id)); + body.set("groupId", groupId); + body.set("offsets", offsets); + return body; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java new file mode 100644 index 0000000..406dc93 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionCoordinatorMetadataRequest { + + private final String groupId; + + public TransactionCoordinatorMetadataRequest(String groupId) { + this.groupId = groupId; + } + + public Struct toStruct() { + Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.TX_COORDINATOR.id)); + body.set("groupId", groupId); + return body; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java new file mode 100644 index 0000000..dadd214 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionCoordinatorMetadataResponse { + + private final Node node; + private final Errors error; + + public TransactionCoordinatorMetadataResponse(Node node, Errors error) { + this.node = node; + this.error = error; + } + + public Node getNode(){ + return node; + } + + public Errors getErrors(){ + return error; + } + + public TransactionCoordinatorMetadataResponse(Struct struct) { + short errorCode = struct.getShort("error"); + Errors error = Errors.forCode(errorCode); + Struct nodeStruct = (Struct) struct.get("broker"); + int id = nodeStruct.getInt("node_id"); + String host = nodeStruct.getString("host"); + int port = nodeStruct.getInt("port"); + this.node = new Node(id, host, port); + this.error = error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java new file mode 100644 index 0000000..1b66711 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionRequest { + + private final int ackTimeoutMs; + private final String groupId; + private final int txId; + private final short txControl; + private final int txRequestTimeoutMs; + private final Set txPartitions; + + + public TransactionRequest(String groupId, int txId, short txControl, int txTimeoutMs, + int ackTimeoutMs, Set txPartitions) { + this.groupId = groupId; + this.txId = txId; + this.txControl = txControl; + this.txRequestTimeoutMs = txTimeoutMs; + this.ackTimeoutMs = ackTimeoutMs; + this.txPartitions = Collections.unmodifiableSet(txPartitions); + } + + public Struct toStruct() { + Struct txRequest = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.TX.id)); + txRequest.set("ackTimeoutMs", ackTimeoutMs); + txRequest.set("groupId", groupId); + txRequest.set("txId", txId); + txRequest.set("txControl", txControl); + txRequest.set("txRequestTimeoutMs", txRequestTimeoutMs); + + // topicParts.group_by(topic) + Map> partitionsGroupByTopic = new HashMap>(); + for (TopicPartition entry : txPartitions) { + if (partitionsGroupByTopic.containsKey(entry.topic())) { + partitionsGroupByTopic.get(entry.topic()).add(entry.partition()); + } else { + partitionsGroupByTopic.put(entry.topic(), new HashSet()); + partitionsGroupByTopic.get(entry.topic()).add(entry.partition()); + } + } + + List topicPartitionStruct = new ArrayList(partitionsGroupByTopic.size()); + for(String topic : partitionsGroupByTopic.keySet()) { + Struct topicPartition = txRequest.instance("txPartitions"); + topicPartition.set("topic", topic); + topicPartition.set("partition", partitionsGroupByTopic.get(topic).toArray()); + topicPartitionStruct.add(topicPartition); + } + + txRequest.set("txPartitions", topicPartitionStruct.toArray()); + return txRequest; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java new file mode 100644 index 0000000..92e8b72 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionResponse { + + private final int txId; + private final Errors error; + + public int getTxId(){ + return txId; + } + + public Errors getError(){ + return error; + } + + public TransactionResponse(int txId, Errors error) { + this.txId = txId; + this.error = error; + } + + public TransactionResponse(Struct struct){ + txId = struct.getInt("txId"); + short errorCode = struct.getShort("error"); + error = Errors.forCode(errorCode); + } + + public Struct toStruct() { + Struct txResponse = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.TX.id)); + txResponse.set("txId", txId); + txResponse.set("error", error); + return txResponse; + } +}