From aaa7a0f44ad0bb77fbf9dfaa18b0d6fc48f6a612 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 16 Mar 2015 08:54:14 -0700 Subject: [PATCH] squashing multi-broker-endpoint patches --- .../apache/kafka/common/protocol/ApiVersion.java | 65 +++++++++++ .../kafka/common/protocol/SecurityProtocol.java | 59 ++++++++++ .../main/scala/kafka/cluster/BrokerEndPoint.scala | 67 +++++++++++ core/src/main/scala/kafka/cluster/EndPoint.scala | 72 ++++++++++++ .../scala/kafka/cluster/SecurityProtocol.scala | 28 +++++ .../BrokerEndPointNotAvailableException.scala | 22 ++++ .../test/scala/unit/kafka/cluster/BrokerTest.scala | 129 +++++++++++++++++++++ system_test/run_all.sh | 10 ++ system_test/run_all_replica.sh | 10 ++ 9 files changed, 462 insertions(+) create mode 100644 clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java create mode 100644 clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java create mode 100644 core/src/main/scala/kafka/cluster/BrokerEndPoint.scala create mode 100644 core/src/main/scala/kafka/cluster/EndPoint.scala create mode 100644 core/src/main/scala/kafka/cluster/SecurityProtocol.scala create mode 100644 core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala create mode 100644 core/src/test/scala/unit/kafka/cluster/BrokerTest.scala create mode 100755 system_test/run_all.sh create mode 100755 system_test/run_all_replica.sh diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java new file mode 100644 index 0000000..fdd8585 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +/** + * This class contains the different Kafka versions. + * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. + * This is only for intra-broker communications - when communicating with clients, the client decides on the API version. + * + * Note that ORDER MATTERS in the enum. + * We consider version A as newer than B if it appears later in the list of constants here. + * If you add new versions, add them in the correct chronological release order. + */ +public enum ApiVersion { + KAFKA_082("0.8.2.X"), + KAFKA_083("0.8.3.X"); + + private final String version; + + public boolean onOrAfter(ApiVersion other) { + return compareTo(other) >= 0; + } + + private ApiVersion(final String version) { + this.version = version; + } + + /* Parse user readable version number. This assumes the convention of 0.8.2.0, 0.8.2.1, 0.8.3.0, 0.9.0.0, etc. + * We are assuming that we will never change API in bug-fix versions (i.e. 0.8.2.1 will have same API as 0.8.2.0) + * */ + public static ApiVersion parseConfig(String version) { + String[] vals = version.split("\\."); + StringBuilder parsed = new StringBuilder(); + parsed.append("KAFKA_"); + + // We only care about the first 3 version digits. Others don't impact the API + parsed.append(vals[0]).append(vals[1]).append(vals[2]); + return ApiVersion.valueOf(parsed.toString()); + } + + public static ApiVersion getLatestVersion() { + ApiVersion[] values = ApiVersion.values(); + return values[values.length - 1]; + } + + @Override + public String toString() { + return version; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java new file mode 100644 index 0000000..276939a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public enum SecurityProtocol { + /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ + TRACE(0, "TRACE"), + /** Un-authenticated, non-encrypted channel */ + PLAINTEXT(1, "PLAINTEXT"); + + private static Map codeToSecurityProtocol = new HashMap(); + private static List names = new ArrayList(); + + static { + for (SecurityProtocol proto: SecurityProtocol.values()) { + codeToSecurityProtocol.put(proto.id, proto); + names.add(proto.name); + } + } + + /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ + public final short id; + + /** A name of the security protocol. This may be used by client configuration. */ + public final String name; + + private SecurityProtocol(int id, String name) { + this.id = (short) id; + this.name = name; + } + + public static String getName(int id) { + return codeToSecurityProtocol.get(id).name; + } + + public static List getNames() { + return names; + } + +} diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala new file mode 100644 index 0000000..22dba18 --- /dev/null +++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.KafkaException +import org.apache.kafka.common.utils.Utils._ + +object BrokerEndpoint { + def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = { + + // BrokerEndPoint URI is host:port or [ipv6_host]:port + // Note that unlike EndPoint (or listener) this URI has no security information. + val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r + + connectionString match { + case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt) + case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") + } + } + + def readFrom(buffer: ByteBuffer): BrokerEndpoint = { + val brokerId = buffer.getInt() + val host = readShortString(buffer) + val port = buffer.getInt() + BrokerEndpoint(brokerId, host, port) + } +} + +/** + * BrokerEndpoint is used to connect to specific host:port pair. + * It is typically used by clients (or brokers when connecting to other brokers) + * and contains no information about the security protocol used on the connection. + * Clients should know which security protocol to use from configuration. + * This allows us to keep the wire protocol with the clients unchanged where the protocol is not needed. + */ +case class BrokerEndpoint(id: Int, host: String, port: Int) { + + def connectionString(): String = formatAddress(host, port) + + def writeTo(buffer: ByteBuffer): Unit = { + buffer.putInt(id) + writeShortString(buffer, host) + buffer.putInt(port) + } + + def sizeInBytes: Int = + 4 + /* broker Id */ + 4 + /* port */ + shortStringLength(host) +} diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala new file mode 100644 index 0000000..1d853b1 --- /dev/null +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.KafkaException +import kafka.cluster.SecurityProtocol._ + +object EndPoint { + + def readFrom(buffer: ByteBuffer): EndPoint = { + val port = buffer.getInt() + val host = readShortString(buffer) + val protocol = buffer.getShort() + EndPoint(host, port, SecurityProtocol(protocol)) + } + + /** + * Create EndPoint object from connectionString + * @param connectionString the format is protocol://host:port or protocol://[ipv6 host]:port + * for example: PLAINTEXT://myhost:9092 or PLAINTEXT://[::1]:9092 + * Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface + * @return + */ + def createEndPoint(connectionString: String): EndPoint = { + val uriParseExp = """^(.*)://\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r + connectionString match { + case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.withName(protocol)) + case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.withName(protocol)) + case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") + } + } +} + +/** + * Part of the broker definition - matching host/port pair to a protocol + */ +case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) { + + override def toString: String = { + val hostStr = if (host == null || host.contains(":")) "[" + host + "]" else host + protocolType + "://" + hostStr + ":" + port + } + + def writeTo(buffer: ByteBuffer): Unit = { + buffer.putInt(port) + writeShortString(buffer, host) + buffer.putShort(protocolType.id.toShort) + } + + def sizeInBytes: Int = + 4 + /* port */ + shortStringLength(host) + + 2 /* protocol id */ +} diff --git a/core/src/main/scala/kafka/cluster/SecurityProtocol.scala b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala new file mode 100644 index 0000000..a732385 --- /dev/null +++ b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + + +object SecurityProtocol extends Enumeration { + + type SecurityProtocol = Value + /* TRACE is used for testing non-default protocol */ + val TRACE = Value + val PLAINTEXT = Value +} + diff --git a/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala new file mode 100644 index 0000000..455d8c6 --- /dev/null +++ b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class BrokerEndPointNotAvailableException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala new file mode 100644 index 0000000..3df8aca --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.utils.Logging +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +import scala.collection.mutable + +class BrokerTest extends JUnit3Suite with Logging { + + @Test + def testSerDe() = { + + val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint) + val origBroker = new Broker(1, listEndPoints) + val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes) + + origBroker.writeTo(brokerBytes) + + val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer]) + assert(origBroker == newBroker) + } + + @Test + def testHashAndEquals() = { + val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT) + val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT) + val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1)) + val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2)) + val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3)) + val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4)) + + assert(broker1 == broker2) + assert(broker1 != broker3) + assert(broker1 != broker4) + assert(broker1.hashCode() == broker2.hashCode()) + assert(broker1.hashCode() != broker3.hashCode()) + assert(broker1.hashCode() != broker4.hashCode()) + + val hashmap = new mutable.HashMap[Broker, Int]() + hashmap.put(broker1, 1) + assert(hashmap.getOrElse(broker1, -1) == 1) + } + + @Test + def testFromJSON() = { + val brokerInfoStr = "{\"version\":2," + + "\"host\":\"localhost\"," + + "\"port\":9092," + + "\"jmx_port\":9999," + + "\"timestamp\":\"1416974968782\"," + + "\"endpoints\":[\"PLAINTEXT://localhost:9092\"]}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "localhost") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092) + } + + @Test + def testFromOldJSON() = { + val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "172.16.8.243") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091) + } + + @Test + def testBrokerEndpointFromURI() = { + var connectionString = "localhost:9092" + var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + // also test for ipv6 + connectionString = "[::1]:9092" + endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + } + + @Test + def testEndpointFromURI() = { + var connectionString = "PLAINTEXT://localhost:9092" + var endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://localhost:9092") + // also test for default bind + connectionString = "PLAINTEXT://:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == null) + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://[null]:9092") + // also test for ipv6 + connectionString = "PLAINTEXT://[::1]:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://[::1]:9092") + } + + + + + + +} diff --git a/system_test/run_all.sh b/system_test/run_all.sh new file mode 100755 index 0000000..60cbc59 --- /dev/null +++ b/system_test/run_all.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +my_ts=`date +"%s"` + +cp testcase_to_run.json testcase_to_run.json_${my_ts} +cp testcase_to_run_all.json testcase_to_run.json + +python -B system_test_runner.py + + diff --git a/system_test/run_all_replica.sh b/system_test/run_all_replica.sh new file mode 100755 index 0000000..22aaa60 --- /dev/null +++ b/system_test/run_all_replica.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +my_ts=`date +"%s"` + +cp testcase_to_run.json testcase_to_run.json_${my_ts} +cp testcase_to_run_all_replica.json testcase_to_run.json + +python -B system_test_runner.py + + -- 1.9.5 (Apple Git-50.3)