From 1a59546c2b8e5cb5c064ba98978633cbd1fb0d7e Mon Sep 17 00:00:00 2001 From: jholoman Date: Sun, 15 Mar 2015 00:58:12 -0400 Subject: [PATCH 1/2] Refactor for ConfigDef --- core/src/main/scala/kafka/network/IPFilter.scala | 113 ++++++++++++ .../main/scala/kafka/network/SocketServer.scala | 17 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 38 ++++ core/src/main/scala/kafka/server/KafkaServer.scala | 3 +- .../scala/unit/kafka/network/IpFilterTest.scala | 195 +++++++++++++++++++++ .../unit/kafka/network/SocketServerTest.scala | 85 ++++++--- .../kafka/server/KafkaConfigConfigDefTest.scala | 2 + .../scala/unit/kafka/server/KafkaConfigTest.scala | 65 ++++++- 8 files changed, 485 insertions(+), 33 deletions(-) create mode 100644 core/src/main/scala/kafka/network/IPFilter.scala create mode 100644 core/src/test/scala/unit/kafka/network/IpFilterTest.scala diff --git a/core/src/main/scala/kafka/network/IPFilter.scala b/core/src/main/scala/kafka/network/IPFilter.scala new file mode 100644 index 0000000..3fe76e2 --- /dev/null +++ b/core/src/main/scala/kafka/network/IPFilter.scala @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.network + +import java.math.BigInteger +import java.net.{InetAddress, UnknownHostException} +import kafka.common.KafkaException +import kafka.metrics.KafkaMetricsGroup +import kafka.utils._ + +object IpFilter { + //Can move most of this to configDef if/when that happens + val AllowRule = "allow" + val AllowRuleDescription = "IP not in whitelist" + val DenyRule = "deny" + val DenyRuleDescription = "IP in Blacklist" + val NoRule = "none" +} + +class IpFilter(ipFilterList: List[String], ipFilterRuleType: String) extends Logging with KafkaMetricsGroup { + import IpFilter._ + val ruleType = ipFilterRuleType + val filterList: List[CIDRRange] = { + try { + ipFilterList.map(entry => new CIDRRange(entry)).toList + } + catch { + // The exception will be checked when loading properties + case e: UnknownHostException => throw new IllegalArgumentException("Error processing IP filter List, unable to parse values into CIDR range. ") + } + } + + def check(inetAddress: InetAddress): Boolean = { + val ip: BigInt = new BigInteger(1, inetAddress.getAddress()) + ruleType match { + case AllowRule => { + if (filterList.exists(_.contains(ip))) + true + else throw new IPFilterException(inetAddress, AllowRuleDescription) + } + case DenyRule => { + if (filterList.exists(_.contains(ip))) + throw new IPFilterException(inetAddress, DenyRuleDescription) + else true + } + case _ => true + } + } +} +/* This class will parse an IP range string (eg. 192.168.2.1/28) and convert it + * into the lower and upper bound integer values for addresses within that range + * We use the sign-magnitude representation of the address as a BigInt to support IPV6 addresses. + * We use bit shifting to apply the mask and get the boundaries for the calculations + * http://en.wikipedia.org/wiki/Signed_number_representations + * http://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing#CIDR_notation + */ +class CIDRRange(val ipRange: String) { + + if (ipRange.indexOf("/") < 0) { + throw new UnknownHostException("Not a valid CIDR Range " + ipRange) + } + val inetAddress = { + InetAddress.getByName(ipRange.split("/").apply(0)) + } + private val prefixLen = inetAddress.getAddress.length + + val prefix = ipRange.split("/").apply(1).toInt + + val mask = getMask(prefix, prefixLen) + + /* bitwise "and" the mask for the low address. bitwise "add" the flipped mask for high address */ + val low: BigInt = new BigInt(new BigInteger(1, inetAddress.getAddress)).&(mask) + val high: BigInt = low.+(~mask) + + /* match for IPV4 or IPV6 (4 or 16) + * fill a BigInteger with all ones (-1 = 11111111) for each octet , flip the bits with not() + * bit-shift right by the length of the prefix + */ + def getMask(prefix: Int, prefixLen: Int): BigInt = { + if ((prefix < 0 || prefix > 128) || (prefix > 32 && prefixLen == 4)) { + throw new UnknownHostException("Not a valid prefix length " + prefix) + } + prefixLen match { + case x if x == 4 => new BigInteger(1, Array.fill[Byte](4)(-1)).not().shiftRight(prefix) + case x if x == 16 => new BigInteger(1, Array.fill[Byte](16)(-1)).not().shiftRight(prefix) + } + } + + def contains(ip: BigInt): Boolean = { + ip >= low && ip <= high + } +} + +class IPFilterException(val ip: InetAddress, val ruleTypeDescription: String) extends KafkaException("Rejected connection from %s due to IP filter rules (%s)".format(ip, ruleTypeDescription)) + +class IPFilterConfigException(message: String) extends KafkaException(message) { + def this() = this(null) +} + diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 76ce41a..0e44a7e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -46,8 +46,9 @@ class SocketServer(val brokerId: Int, val recvBufferSize: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, + val maxConnectionsPerIpOverrides: Map[String, Int], val connectionsMaxIdleMs: Long, - val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { + val ipFilters: IpFilter) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @@ -83,7 +84,7 @@ class SocketServer(val brokerId: Int, requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections - this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) + this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas, ipFilters) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") @@ -201,7 +202,8 @@ private[kafka] class Acceptor(val host: String, private val processors: Array[Processor], val sendBufferSize: Int, val recvBufferSize: Int, - connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { + connectionQuotas: ConnectionQuotas, + val ipFilters: IpFilter) extends AbstractServerThread(connectionQuotas) { val serverChannel = openServerSocket(host, port) /** @@ -268,8 +270,10 @@ private[kafka] class Acceptor(val host: String, def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() + val address = socketChannel.socket().getInetAddress try { - connectionQuotas.inc(socketChannel.socket().getInetAddress) + connectionQuotas.inc(address) + ipFilters.check(address) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) @@ -284,8 +288,11 @@ private[kafka] class Acceptor(val host: String, case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) + case e: IPFilterException => + warn(e.getMessage.format(e.ip) ) + close(socketChannel) + } } - } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 46d21c7..e25e3c5 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,6 +21,7 @@ import java.util.Properties import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} +import kafka.network.IpFilter import kafka.utils.Utils import org.apache.kafka.common.config.ConfigDef @@ -49,6 +50,8 @@ object Defaults { val MaxConnectionsPerIp: Int = Int.MaxValue val MaxConnectionsPerIpOverrides: String = "" val ConnectionsMaxIdleMs = 10 * 60 * 1000L + val IpFilterRuleType:String = IpFilter.NoRule + val IpFilterList:String = "" /** ********* Log Configuration ***********/ val NumPartitions = 1 @@ -151,6 +154,9 @@ object KafkaConfig { val MaxConnectionsPerIpProp = "max.connections.per.ip" val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" val ConnectionsMaxIdleMsProp = "connections.max.idle.ms" + val IpFilterRuleTypeProp = "security.ip.filter.rule.type" + val IpFilterListProp = "security.ip.filter.list" + /** ********* Log Configuration ***********/ val NumPartitionsProp = "num.partitions" val LogDirsProp = "log.dirs" @@ -261,6 +267,9 @@ object KafkaConfig { val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address" val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections" val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this" + val IpFilterRuleTypeDoc = "The type of IP Filtering list to be evaluated, either 'allow' (whitelist) or 'deny' (blacklist)" + val IpFilterListDoc = "IP Whitelist / Blacklist, specified in CIDR notation eg, 192.168.1.1/32, 192.168.2.1/24 " + + "/32 is a single IPv4 address, /128 is a single IPv6 address" /** ********* Log Configuration ***********/ val NumPartitionsDoc = "The default number of log partitions per topic" val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)" @@ -382,6 +391,8 @@ object KafkaConfig { .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(1), MEDIUM, MaxConnectionsPerIpDoc) .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc) + .define(IpFilterRuleTypeProp, STRING, Defaults.IpFilterRuleType, MEDIUM, IpFilterRuleTypeDoc) + .define(IpFilterListProp, STRING, Defaults.IpFilterList, MEDIUM, IpFilterListDoc) /** ********* Log Configuration ***********/ .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc) @@ -501,6 +512,8 @@ object KafkaConfig { maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int], _maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String], connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long], + _IpFilterRuleType = parsed.get(IpFilterRuleTypeProp).asInstanceOf[String], + _IpFilterList = parsed.get(IpFilterListProp).asInstanceOf[String], /** ********* Log Configuration ***********/ numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int], @@ -640,6 +653,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp, private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides, val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs, + private val _IpFilterRuleType:String = Defaults.IpFilterRuleType, + private val _IpFilterList:String = Defaults.IpFilterList, /** ********* Log Configuration ***********/ val numPartitions: Int = Defaults.NumPartitions, @@ -747,6 +762,23 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val maxConnectionsPerIpOverrides: Map[String, Int] = getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)} + val IpFilters = new IpFilter(Utils.parseCsvList(_IpFilterList).toList, _IpFilterRuleType.toLowerCase()) + + /* + private def checkIpFilterProps(ipf: IpFilter) = { + val validRules = "(" + IpFilter.AllowRule + "|" + IpFilter.DenyRule + "|" + IpFilter.NoRule + ")" + val rgx = new Regex(validRules) + // Check if a valid rule type has been specified + if (!rgx.findFirstIn(ipf.ruleType).isDefined) { + throw new IllegalArgumentException("Invalid IP filter rule type specified: " + ipf.ruleType + ". Should be in " + validRules) + } + // Check that both properties have been set + if ((!(ipf.filterList.isEmpty) && ipf.ruleType == IpFilter.NoRule) || (ipf.filterList.isEmpty && ipf.ruleType != IpFilter.NoRule)) { + throw new IllegalArgumentException("Error processing IP filter list: Both rule type and rules list must be set") + } + } + */ + private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute @@ -779,6 +811,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) + require(_IpFilterRuleType == IpFilter.AllowRule || _IpFilterRuleType == IpFilter.DenyRule || _IpFilterRuleType == IpFilter.NoRule, + "security.ip.filter.rule.type is invalid. Should be in (" + IpFilter.AllowRule + "|" + IpFilter.DenyRule + "|" + IpFilter.NoRule + ")") + require( (_IpFilterList.isEmpty && _IpFilterRuleType == IpFilter.NoRule) || ( !_IpFilterList.isEmpty && _IpFilterRuleType != IpFilter.NoRule), + "security.ip.filter.rule.type and security.ip.filter.list must both specified if defined.") } def toProps: Properties = { @@ -810,6 +846,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString) props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides) props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString) + props.put(IpFilterRuleTypeProp, _IpFilterRuleType.toString) + props.put(IpFilterListProp, _IpFilterList.toString) /** ********* Log Configuration ***********/ props.put(NumPartitionsProp, numPartitions.toString) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dddef93..7bcc7cb 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -125,8 +125,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, + config.maxConnectionsPerIpOverrides, config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides) + config.IpFilters) socketServer.startup() /* start replica manager */ diff --git a/core/src/test/scala/unit/kafka/network/IpFilterTest.scala b/core/src/test/scala/unit/kafka/network/IpFilterTest.scala new file mode 100644 index 0000000..267a26b --- /dev/null +++ b/core/src/test/scala/unit/kafka/network/IpFilterTest.scala @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.network + +import java.math.BigInteger +import java.net._ +import kafka.network.{CIDRRange, IPFilterException, IpFilter} +import org.junit.Assert._ +import org.junit.Test +import kafka.utils.Utils + +class IpFilterTest { + + def getBigInt (ipString: String):BigInt = { + val ipBigInt: BigInt = new BigInteger(1, InetAddress.getByName(ipString).getAddress) + ipBigInt + } + @Test + def testEmptyList(): Unit = { + val range = Utils.parseCsvList("").toList + val ipFilters = new IpFilter(range, IpFilter.NoRule) + val ip = InetAddress.getByName("192.168.2.1") + assertTrue(ipFilters.check(ip)) + } + + @Test + def testBadPrefix: Unit = { + val range = List(List("192.168.2.1/-1"), List("192.168.2.1/64")) + for (l <- range) { + try { + new IpFilter(l, IpFilter.AllowRule) + fail() + } catch { + case e: IllegalArgumentException => + } + } + } + + @Test + // Checks that the IP address is in a given range + def testIpV4Range(): Unit = { + val ipRange: String = "192.168.2.0/25" // 0-127 + val cidr = new CIDRRange(ipRange) + val ip1 = getBigInt("192.168.2.1") + val ip2 = getBigInt("192.168.2.128") + assertTrue(cidr.contains(ip1)) + assertFalse(cidr.contains(ip2)) + } + + @Test + def testIpV6Range1(): Unit = { + val ipRange: String = "fe80:0:0:0:202:b3ff:fe1e:8320/124" + val cidr = new CIDRRange(ipRange) + val ip1 = getBigInt("fe80:0:0:0:202:b3ff:fe1e:8320") + val ip2 = getBigInt("fe80:0:0:0:202:b3ff:fe1e:833f") + assertTrue(cidr.contains(ip1)) + assertFalse(cidr.contains(ip2)) + } + + @Test + def testIPV6Range2(): Unit = { + val ipRange: String = "fe80:0:0:0:202:b3ff:fe1e:8320/64" + val cidr = new CIDRRange(ipRange) + val ip1 = getBigInt("fe80:0000:0000:0000:0202:b3ff:fe1e:8320") + val ip2 = getBigInt("fe80:0000:0000:0000:ffff:ffff:ffff:ffff") + val ip3 = getBigInt("fe80:0:0:3:ffff:ffff:ffff:ffff") + assertTrue(cidr.contains(ip1)) + assertTrue(cidr.contains(ip2)) + assertFalse(cidr.contains(ip3)) + } + + // This is kind of a bogus test but tests the logic for ranges < 32 and IPv6 + @Test + def testIPV6Range3(): Unit = { + val ipRange: String = "fe80:0:0:0:202:b3ff:fe1e:8320/12" + val cidr = new CIDRRange(ipRange) + val ip1 = getBigInt("fe80:0000:0000:0000:0202:b3ff:fe1e:8320") + val ip2 = getBigInt("fe8f:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + val ip3 = getBigInt("fe9f:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + assertTrue(cidr.contains(ip1)) + assertTrue(cidr.contains(ip2)) + assertFalse(cidr.contains(ip3)) + } + + @Test + def testSingleBlackList(): Unit = { + val range1 = List("192.168.2.10/32") + val ipFilters = new IpFilter(range1, IpFilter.DenyRule) + val ip = InetAddress.getByName("192.168.2.10") + try { + ipFilters.check(ip) + fail() + } catch { + case e: IPFilterException => // this is good + } + val ip2 = InetAddress.getByName("192.168.2.1") + assertTrue(ipFilters.check(ip2)) + } + + @Test + def testMultipleBlackListEntries(): Unit = { + val range = List("192.168.2.0/28", "192.168.2.25/28") + val ipFilters = new IpFilter(range, IpFilter.DenyRule) + val ip1 = InetAddress.getByName("192.168.2.3") + val ip2 = InetAddress.getByName("192.168.2.26") + val ip3 = InetAddress.getByName("192.162.1.1") + try { + ipFilters.check(ip1) + fail() + } catch { + case e: IPFilterException => // this is good + } + try { + ipFilters.check(ip2) + fail() + } catch { + case e: IPFilterException => // this is good + } + assertTrue(ipFilters.check(ip3)) + } + + @Test + def testSingleWhiteList(): Unit = { + val range1 = List("192.168.2.10/32") + val ipFilters = new IpFilter(range1, IpFilter.AllowRule) + val ip = InetAddress.getByName("192.168.2.10") + val ip2 = InetAddress.getByName("192.168.2.1") + assertTrue(ipFilters.check(ip)) + try { + ipFilters.check(ip2) + fail() + } catch { + case e: IPFilterException => // this is good + } + } + + @Test + def testMultipleWhiteListEntries(): Unit = { + val range = List("192.168.2.0/24", "10.10.10.0/16") + val ipFilters = new IpFilter(range, IpFilter.AllowRule) + val ip1 = InetAddress.getByName("192.168.2.128") + val ip2 = InetAddress.getByName("192.168.1.128") + val ip3 = InetAddress.getByName("10.10.1.1") + val ip4 = InetAddress.getByName("10.9.1.1") + assertTrue(ipFilters.check(ip1)) + try { + ipFilters.check(ip2) + fail() + } catch { + case e: IPFilterException => // this is good + } + assertTrue(ipFilters.check(ip3)) + try { + ipFilters.check(ip4) + fail() + } catch { + case e: IPFilterException => // this is good + } + } + + @Test + def testRangeFormat(): Unit = { + val ruleType = IpFilter.AllowRule + val range1 = List("192.168.2") + val range2 = List("192.168.1.2/32", "10.A.B.C/AAAA") + val range3 = List("blahblahblah:") + val range4 = List("192aaaa:") + val rangeList: List[List[String]] = List(range1, range2, range2, range4) + + for (l <- rangeList) + try { + val ipFilters = new IpFilter(l, ruleType) + fail() + } catch { + case e: IllegalArgumentException => + } + } +} + + + diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 0af23ab..fb58eb8 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -34,19 +34,22 @@ import scala.collection.Map class SocketServerTest extends JUnitSuite { - val server: SocketServer = new SocketServer(0, - host = null, - port = kafka.utils.TestUtils.choosePort, - numProcessorThreads = 1, - maxQueuedRequests = 50, - sendBufferSize = 300000, - recvBufferSize = 300000, - maxRequestSize = 50, - maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = Map.empty[String,Int]) - server.startup() - + def getSocketServer(brokerId: Int = 0, + host: String = null, + port: Int = kafka.utils.TestUtils.choosePort, + numProcessorThreads: Int = 1, + maxQueuedRequests: Int = 50, + sendBufferSize: Int = 300000, + recvBufferSize: Int = 300000, + maxRequestSize: Int = 50, + maxConnectionsPerIp: Int = 5, + maxConnectionsPerIpOverrides: Map[String, Int] = Map.empty[String, Int], + connectionsMaxIdleMs: Int = 60 * 1000, + ipFilters: IpFilter = new IpFilter(List.empty[String], IpFilter.NoRule)): SocketServer = { + + new SocketServer(brokerId, host, port, numProcessorThreads, maxQueuedRequests, sendBufferSize, recvBufferSize, maxRequestSize, + maxConnectionsPerIp, maxConnectionsPerIpOverrides, connectionsMaxIdleMs, ipFilters) + } def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { val outgoing = new DataOutputStream(socket.getOutputStream) outgoing.writeInt(request.length + 2) @@ -75,10 +78,14 @@ class SocketServerTest extends JUnitSuite { def connect(s:SocketServer = server) = new Socket("localhost", s.port) + val server:SocketServer = getSocketServer() + server.startup() + @After def cleanup() { server.shutdown() } + @Test def simpleRequest() { val socket = connect() @@ -160,17 +167,7 @@ class SocketServerTest extends JUnitSuite { def testMaxConnectionsPerIPOverrides(): Unit = { val overrideNum = 6 val overrides: Map[String, Int] = Map("localhost" -> overrideNum) - val overrideServer: SocketServer = new SocketServer(0, - host = null, - port = kafka.utils.TestUtils.choosePort, - numProcessorThreads = 1, - maxQueuedRequests = 50, - sendBufferSize = 300000, - recvBufferSize = 300000, - maxRequestSize = 50, - maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = overrides) + val overrideServer: SocketServer = getSocketServer(maxConnectionsPerIpOverrides = overrides) overrideServer.startup() // make the maximum allowable number of connections and then leak them val conns = ((0 until overrideNum).map(i => connect(overrideServer))) @@ -180,4 +177,44 @@ class SocketServerTest extends JUnitSuite { assertEquals(-1, conn.getInputStream.read()) overrideServer.shutdown() } + + @Test + def testIPFilterBlackList() = { + val a: IpFilter = new IpFilter(List("localhost/32"), IpFilter.DenyRule) + val ipFilterServer = getSocketServer(ipFilters = a ) + ipFilterServer.startup() + val conn = connect(ipFilterServer) + assertEquals(-1, conn.getInputStream.read()) + ipFilterServer.shutdown() + } + + @Test + def testIPFilterWhiteList() = { + val a: IpFilter = new IpFilter(List("localhost/32"), IpFilter.AllowRule) + val ipFilterServer = getSocketServer(ipFilters = a) + ipFilterServer.startup() + val conn = connect(ipFilterServer) + conn.setSoTimeout(3000) + try { + conn.getInputStream.read() + fail() + } catch { + case e: SocketTimeoutException => // Means the socket is still open + } + ipFilterServer.shutdown() + } + + @Test + def testIPFilterNone() = { + val conn = connect(server) + conn.setSoTimeout(3000) + try { + conn.getInputStream.read() + fail() + } catch { + case e: SocketTimeoutException => //Means the socket is still open + } + } + + } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 191251d..4701f1d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -178,6 +178,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.SocketRequestMaxBytesProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.MaxConnectionsPerIpProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.MaxConnectionsPerIpOverridesProp => expected.setProperty(name, "127.0.0.1:2, 127.0.0.2:3") + case KafkaConfig.IpFilterListProp => expected.setProperty(name, "192.168.2.1/32, 192.168.2.1/28") + case KafkaConfig.IpFilterRuleTypeProp => expected.setProperty(name, "allow") case KafkaConfig.NumPartitionsProp => expected.setProperty(name, "2") case KafkaConfig.LogDirsProp => expected.setProperty(name, "/tmp/logs,/tmp/logs2") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 7f47e6f..e5f9474 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,13 +17,12 @@ package kafka.server +import junit.framework.Assert.assertEquals +import kafka.network.IpFilter import org.apache.kafka.common.config.ConfigException import org.junit.Test -import junit.framework.Assert._ import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils -import kafka.message.GZIPCompressionCodec -import kafka.message.NoCompressionCodec class KafkaConfigTest extends JUnit3Suite { @@ -209,4 +208,64 @@ class KafkaConfigTest extends JUnit3Suite { KafkaConfig.fromProps(props) } } + + @Test + def testIpFilterRuleDefaults(): Unit = { + val props = TestUtils.createBrokerConfig(0,8181) + val cfg = KafkaConfig.fromProps(props) + assertEquals(IpFilter.NoRule, cfg.IpFilters.ruleType) + assertEquals(List.empty[String], cfg.IpFilters.filterList) + } + + @Test + def testIpFilterRuleType(): Unit = { + val range: String = "192.168.1.1/32" + val rangeEmpty: String = "" + val ruleTypeBad = "Allowe" // misspelled + + val props1 = TestUtils.createBrokerConfig(0,8181) + props1.put("security.ip.filter.list", rangeEmpty) + props1.put("security.ip.filter.rule.type", IpFilter.NoRule) + val cfg1 = KafkaConfig.fromProps(props1) + assertEquals(cfg1.IpFilters.ruleType, IpFilter.NoRule) + + val props2 = TestUtils.createBrokerConfig(0,8181) + props2.put("security.ip.filter.list", range) + props2.put("security.ip.filter.rule.type", IpFilter.AllowRule) + val cfg2 = KafkaConfig.fromProps(props2) + assertEquals(cfg2.IpFilters.ruleType, IpFilter.AllowRule) + + val props3 = TestUtils.createBrokerConfig(0,8181) + props2.put("security.ip.filter.list", range) + props2.put("security.ip.filter.rule.type", ruleTypeBad) + intercept[IllegalArgumentException] { + KafkaConfig.fromProps(props2) + } + } + + @Test + def testIpFilterBothSpecified() = { + val rangeEmpty = "" + val ruleType = IpFilter.AllowRule + val range = "192.168.2.1/32" + val ruleType2 = IpFilter.NoRule + + val props1 = TestUtils.createBrokerConfig(0,8181) + props1.put("security.ip.filter.list", range) + props1.put("security.ip.filter.rule.type", IpFilter.NoRule) + + intercept[IllegalArgumentException]{ + KafkaConfig.fromProps(props1) + } + + val props2 = TestUtils.createBrokerConfig(0,8181) + props1.put("security.ip.filter.list", rangeEmpty) + props1.put("security.ip.filter.rule.type", IpFilter.AllowRule) + + intercept[IllegalArgumentException]{ + KafkaConfig.fromProps(props1) + } + + } + } -- 2.3.0 From 99f5da33c004af6245287124fa2b5583bb4ae15e Mon Sep 17 00:00:00 2001 From: jholoman Date: Sun, 15 Mar 2015 01:12:40 -0400 Subject: [PATCH 2/2] KAFKA-1810 ConfigDef Refactor --- core/src/main/scala/kafka/network/IPFilter.scala | 9 +++------ core/src/main/scala/kafka/network/SocketServer.scala | 2 +- core/src/test/scala/unit/kafka/network/IpFilterTest.scala | 14 +++++++------- .../test/scala/unit/kafka/network/SocketServerTest.scala | 6 +++--- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/network/IPFilter.scala b/core/src/main/scala/kafka/network/IPFilter.scala index 3fe76e2..4e15543 100644 --- a/core/src/main/scala/kafka/network/IPFilter.scala +++ b/core/src/main/scala/kafka/network/IPFilter.scala @@ -50,11 +50,11 @@ class IpFilter(ipFilterList: List[String], ipFilterRuleType: String) extends Log case AllowRule => { if (filterList.exists(_.contains(ip))) true - else throw new IPFilterException(inetAddress, AllowRuleDescription) + else throw new IpFilterException(inetAddress, AllowRuleDescription) } case DenyRule => { if (filterList.exists(_.contains(ip))) - throw new IPFilterException(inetAddress, DenyRuleDescription) + throw new IpFilterException(inetAddress, DenyRuleDescription) else true } case _ => true @@ -105,9 +105,6 @@ class CIDRRange(val ipRange: String) { } } -class IPFilterException(val ip: InetAddress, val ruleTypeDescription: String) extends KafkaException("Rejected connection from %s due to IP filter rules (%s)".format(ip, ruleTypeDescription)) +class IpFilterException(val ip: InetAddress, val ruleTypeDescription: String) extends KafkaException("Rejected connection from %s due to IP filter rules (%s)".format(ip, ruleTypeDescription)) -class IPFilterConfigException(message: String) extends KafkaException(message) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 0e44a7e..54f4387 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -288,7 +288,7 @@ private[kafka] class Acceptor(val host: String, case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) - case e: IPFilterException => + case e: IpFilterException => warn(e.getMessage.format(e.ip) ) close(socketChannel) } diff --git a/core/src/test/scala/unit/kafka/network/IpFilterTest.scala b/core/src/test/scala/unit/kafka/network/IpFilterTest.scala index 267a26b..d75ba83 100644 --- a/core/src/test/scala/unit/kafka/network/IpFilterTest.scala +++ b/core/src/test/scala/unit/kafka/network/IpFilterTest.scala @@ -18,7 +18,7 @@ package unit.kafka.network import java.math.BigInteger import java.net._ -import kafka.network.{CIDRRange, IPFilterException, IpFilter} +import kafka.network.{CIDRRange, IpFilterException, IpFilter} import org.junit.Assert._ import org.junit.Test import kafka.utils.Utils @@ -105,7 +105,7 @@ class IpFilterTest { ipFilters.check(ip) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } val ip2 = InetAddress.getByName("192.168.2.1") assertTrue(ipFilters.check(ip2)) @@ -122,13 +122,13 @@ class IpFilterTest { ipFilters.check(ip1) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } try { ipFilters.check(ip2) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } assertTrue(ipFilters.check(ip3)) } @@ -144,7 +144,7 @@ class IpFilterTest { ipFilters.check(ip2) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } } @@ -161,14 +161,14 @@ class IpFilterTest { ipFilters.check(ip2) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } assertTrue(ipFilters.check(ip3)) try { ipFilters.check(ip4) fail() } catch { - case e: IPFilterException => // this is good + case e: IpFilterException => // this is good } } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index fb58eb8..a3ab1fd 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -179,7 +179,7 @@ class SocketServerTest extends JUnitSuite { } @Test - def testIPFilterBlackList() = { + def testIpFilterBlackList() = { val a: IpFilter = new IpFilter(List("localhost/32"), IpFilter.DenyRule) val ipFilterServer = getSocketServer(ipFilters = a ) ipFilterServer.startup() @@ -189,7 +189,7 @@ class SocketServerTest extends JUnitSuite { } @Test - def testIPFilterWhiteList() = { + def testIpFilterWhiteList() = { val a: IpFilter = new IpFilter(List("localhost/32"), IpFilter.AllowRule) val ipFilterServer = getSocketServer(ipFilters = a) ipFilterServer.startup() @@ -205,7 +205,7 @@ class SocketServerTest extends JUnitSuite { } @Test - def testIPFilterNone() = { + def testIpFilterNone() = { val conn = connect(server) conn.setSoTimeout(3000) try { -- 2.3.0