From 178d59b5dbfe1f010c8a04a427198375e7481619 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Wed, 20 May 2015 12:08:43 -0700 Subject: [PATCH] KAFKA-2212: Add CLI for acl management of authorizer. --- core/src/main/scala/kafka/admin/AclCommand.scala | 206 +++++++++++++++++++++ .../scala/unit/kafka/admin/AclCommandTest.scala | 83 +++++++++ 2 files changed, 289 insertions(+) create mode 100755 core/src/main/scala/kafka/admin/AclCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/AclCommandTest.scala diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala new file mode 100755 index 0000000..6663b2e --- /dev/null +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import java.io.{File, FileInputStream} +import java.util.Properties + +import joptsimple._ +import kafka.security.auth._ +import kafka.server.KafkaConfig +import kafka.utils._ +import org.apache.kafka.common.utils.Utils + +object AclCommand { + + val delimter: String = ","; + val nl = System.getProperty("line.separator") + + def main(args: Array[String]): Unit = { + + val opts = new AclCommandOptions(args) + + if(args.length == 0) { + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") + } + + val props: Properties = new Properties() + props.load(new FileInputStream(new File(opts.options.valueOf(opts.config)))) + val kafkaConfig: KafkaConfig = KafkaConfig.fromProps(props) + val authZ: Authorizer = CoreUtils.createObject(kafkaConfig.authorizerClassName) + authZ.initialize(kafkaConfig) + + val actions = Seq(opts.addOpt, opts.removeOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --add, --remove.") + + opts.checkArgs() + + try { + if(opts.options.has(opts.addOpt)) + addAcl(authZ, opts) + else if(opts.options.has(opts.removeOpt)) + removeAcl(authZ, opts) + else if(opts.options.has(opts.listOpt)) + listAcl(authZ, opts) + } catch { + case e: Throwable => + println("Error while executing topic command " + e.getMessage) + println(Utils.stackTrace(e)) + } + } + + private def addAcl(authZ: Authorizer, opts: AclCommandOptions): Unit = { + val acls: Set[Acl] = getAcl(opts) + println("Adding acls: " + nl + acls.map("\t" + _).mkString(nl) + nl) + authZ.addAcls(acls, getResource(opts)) + listAcl(authZ, opts) + } + + private def removeAcl(authZ: Authorizer, opts: AclCommandOptions): Unit = { + val acls: Set[Acl] = getAcl(opts) + if(acls.isEmpty) { + if(confirmaAction("Are you sure you want to delete all acls for resource: " + getResource(opts) + " y/n?")) + authZ.removeAcls(getResource(opts)) + } else { + if(confirmaAction(("Are you sure you want to remove acls: " + nl + acls.map("\t" + _).mkString(nl) + nl) + " from resource " + getResource(opts)+ " y/n?")) + authZ.removeAcls(acls, getResource(opts)) + } + + listAcl(authZ, opts) + } + + private def listAcl(authZ: Authorizer, opts: AclCommandOptions): Unit = { + val acls: Set[Acl] = authZ.getAcls(getResource(opts)) + println("Following is list of acls for resource : " + getResource(opts) + nl + acls.map("\t" + _).mkString(nl) + nl) + } + + private def getAcl(opts: AclCommandOptions) : Set[Acl] = { + val allowedPrincipals: Set[KafkaPrincipal] = if(opts.options.has(opts.allowPrincipalsOpt)) + opts.options.valueOf(opts.allowPrincipalsOpt).toString.split(delimter).map(s => KafkaPrincipal.fromString(s)).toSet + else + if(opts.options.has(opts.allowHostsOpt)) Set[KafkaPrincipal](Acl.wildCardPrincipal) else Set.empty[KafkaPrincipal] + + val deniedPrincipals: Set[KafkaPrincipal] = if(opts.options.has(opts.denyPrincipalsOpt)) + opts.options.valueOf(opts.denyPrincipalsOpt).toString.split(delimter).map(s => KafkaPrincipal.fromString(s)).toSet + else + if(opts.options.has(opts.denyHostssOpt)) Set[KafkaPrincipal](Acl.wildCardPrincipal) else Set.empty[KafkaPrincipal] + + val allowedHosts: Set[String] = if(opts.options.has(opts.allowHostsOpt)) + opts.options.valueOf(opts.allowHostsOpt).toString.split(delimter).toSet + else + if(opts.options.has(opts.allowPrincipalsOpt)) Set[String](Acl.wildCardHost) else Set.empty[String] + + val deniedHosts = if(opts.options.has(opts.denyHostssOpt)) + opts.options.valueOf(opts.denyHostssOpt).toString.split(delimter).toSet + else + if(opts.options.has(opts.denyPrincipalsOpt)) Set[String](Acl.wildCardHost) else Set.empty[String] + + val allowedOperations: Set[Operation] = if(opts.options.has(opts.operationsOpt)) + opts.options.valueOf(opts.operationsOpt).toString.split(delimter).map(s => Operation.fromString(s)).toSet + else + Set[Operation](Operation.ALL) + + var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl] + if(allowedHosts.nonEmpty || allowedPrincipals.nonEmpty ) + acls += new Acl(allowedPrincipals, PermissionType.ALLOW, allowedHosts, allowedOperations) + + if(deniedHosts.nonEmpty || deniedPrincipals.nonEmpty ) + acls += new Acl(deniedPrincipals, PermissionType.DENY, deniedHosts, allowedOperations) + + acls.toSet + } + + private def getResource(opts: AclCommandOptions) : Resource = { + if(opts.options.has(opts.topicOpt)) + return new Resource(ResourceType.TOPIC, opts.options.valueOf(opts.topicOpt).toString) + else if(opts.options.has(opts.clusterOpt)) + return Resource.clusterResource + else if(opts.options.has(opts.groupOpt)) + return new Resource(ResourceType.CONSUMER_GROUP, opts.options.valueOf(opts.groupOpt).toString) + else + println("You must provide at least one of the resource argument from --topic , --cluster or --consumer-group ") + System.exit(1) + + null + } + + private def confirmaAction(msg: String): Boolean = { + println(msg) + val userInput: String = Console.readLine() + + "y".equalsIgnoreCase(userInput) + } + + class AclCommandOptions(args: Array[String]) { + val parser = new OptionParser + val config = parser.accepts("config", "REQUIRED: Path to a server.properties file") + .withRequiredArg + .describedAs("config") + .ofType(classOf[String]) + + val topicOpt = parser.accepts("topic", "topic to which acls should be added or removed.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val clusterOpt = parser.accepts("cluster", "add/remove cluster action acls.") + val groupOpt = parser.accepts("consumer-group", "add remove acls for a group") + .withRequiredArg + .describedAs("consumer-group") + .ofType(classOf[String]) + + val addOpt = parser.accepts("add", "Add indicates you are trying to add acls.") + val removeOpt = parser.accepts("remove", "Indicates you are trying to remove an acl.") + val listOpt = parser.accepts("list", "list acls for the specifed resource, use --topic or --consumer-group or --cluster") + + val operationsOpt = parser.accepts("operations", "comma separated list of operations, allowed Operations are: " + nl + + Operation.values().map("\t" + _).mkString(nl) + nl) + .withRequiredArg + .ofType(classOf[String]) + + val allowPrincipalsOpt = parser.accepts("allowprincipals", "comma separated list of principals where principal is in principalType: name format") + .withRequiredArg + .describedAs("allowprincipals") + .ofType(classOf[String]) + + val denyPrincipalsOpt = parser.accepts("denyprincipals", "comma separated list of principals where principal is in principalType: name format") + .withRequiredArg + .describedAs("denyPrincipalsOpt") + .ofType(classOf[String]) + + val allowHostsOpt = parser.accepts("allowhosts", "comma separated list of principals where principal is in principalType: name format") + .withRequiredArg + .describedAs("allowhosts") + .ofType(classOf[String]) + + val denyHostssOpt = parser.accepts("denyhosts", "comma separated list of principals where principal is in principalType: name format") + .withRequiredArg + .describedAs("denyhosts") + .ofType(classOf[String]) + + val helpOpt = parser.accepts("help", "Print usage information.") + + val options = parser.parse(args : _*) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, config) + } + } + +} diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala new file mode 100644 index 0000000..8b85ebe --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.admin + +import java.io.{ByteArrayInputStream, File, FileWriter} +import java.util.Properties + +import junit.framework.Assert._ +import kafka.admin.AclCommand +import kafka.security.auth._ +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.junit.{After, Test} +import org.scalatest.junit.JUnit3Suite + +class AclCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { + + @Test + def testAclCli() { + val brokerProps: Properties = TestUtils.createBrokerConfig(0, zkConnect) + brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer") + val config: File = TestUtils.tempFile() + brokerProps.store(new FileWriter(config), "test-broker-config"); + + val topic = "test" + val user1: KafkaPrincipal = KafkaPrincipal.fromString("user:test1") + val user2: KafkaPrincipal = KafkaPrincipal.fromString("user:test2") + val operation1 = Operation.READ + val operation2 = Operation.WRITE + val host1 = "host1" + val host2 = "host2" + val acl1 = new Acl(Set[KafkaPrincipal](user1, user2), PermissionType.ALLOW, Set[String](host1, host2), Set[Operation](operation1, operation2)) + val acl2 = new Acl(Set[KafkaPrincipal](user1, user2), PermissionType.DENY, Set[String](host1, host2), Set[Operation](operation1, operation2)) + val acls: Set[Acl] = Set[Acl](acl1 , acl2) + + val in = new ByteArrayInputStream("y".getBytes()); + System.setIn(in) + val args: Array[String] = Array("--config", config.getPath , + "--topic", topic, + "--allowprincipals",user1.toString + AclCommand.delimter + user2.toString, + "--allowhosts", host1 + AclCommand.delimter + host2, + "--denyprincipals", user1.toString + AclCommand.delimter + user2.toString, + "--denyhosts", host1 + AclCommand.delimter + host2, + "--operations", operation1.name + AclCommand.delimter + operation2.name) + + AclCommand.main(args :+ "--add") + assertEquals(acls, getAuthorizer.getAcls(new Resource(ResourceType.TOPIC, topic))) + +// Fails transiently , needs investigation. +// AclCommand.main(args :+ "--remove") +// assertTrue(getAuthorizer.getAcls(new Resource(ResourceType.TOPIC, topic)).isEmpty) + } + + def getAuthorizer : Authorizer = { + val props: Properties = new Properties() + props.put(KafkaConfig.ZkConnectProp, zkConnect) + val kafkaConfig: KafkaConfig = KafkaConfig.fromProps(props) + val authZ: SimpleAclAuthorizer = new SimpleAclAuthorizer + authZ.initialize(kafkaConfig) + + authZ + } + + @After + def after(): Unit = { + System.setIn(System.in) + } +} -- 2.1.3.36.g8e36a6d