From a8846c6b9df5a94ff8088c8dbddd00aa264308bf Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Wed, 20 May 2015 12:04:43 -0700 Subject: [PATCH] KAFKA-2211: Out of box implementation for authorizer interface. --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 268 +++++++++++++++++++++ .../test/resources/authorizer-config.properties | 1 + .../security/auth/SimpleAclAuthorizerTest.scala | 179 ++++++++++++++ 3 files changed, 448 insertions(+) create mode 100644 core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala create mode 100644 core/src/test/resources/authorizer-config.properties create mode 100644 core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala new file mode 100644 index 0000000..75d570e --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth + +import java.io.FileInputStream +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.network.RequestChannel.Session +import kafka.server.KafkaConfig +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils._ +import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import org.apache.zookeeper.ZooDefs +import org.apache.zookeeper.data.{ACL, Id} + +class SimpleAclAuthorizer extends Authorizer with Logging { + + private val zkUrlProp: String = "zookeeper.url" + private val allowEveryoneProp: String = "allow.everyone" + + private var superUsers: Set[KafkaPrincipal] = Set.empty + private var zkClient: ZkClient = null + private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1 , threadNamePrefix = "authorizer") + private val aclZkPath: String = "/kafka-acl" + private val aclChangedZkPath: String = "/kafka-acl-changed" + private val aclCache: scala.collection.mutable.HashMap[Resource, Set[Acl]] = new scala.collection.mutable.HashMap[Resource, Set[Acl]] + private val lock = new ReentrantReadWriteLock() + private val authorizerConfig: Properties = new Properties(); + + /** + * Guaranteed to be called before any authorize call is made. + */ + override def initialize(kafkaConfig: KafkaConfig): Unit = { + superUsers = kafkaConfig.superUser match { + case null => Set.empty[KafkaPrincipal] + case (str: String) => if(str != null && !str.isEmpty) str.split(",").map(s => KafkaPrincipal.fromString(s.trim)).toSet else Set.empty + } + + if(kafkaConfig.authorizerConfigPath != null && !kafkaConfig.authorizerConfigPath.isEmpty) { + authorizerConfig.load(new FileInputStream(kafkaConfig.authorizerConfigPath.trim)) + } + + val zkUrl: String = if (authorizerConfig.containsKey(zkUrlProp)) + authorizerConfig.getProperty(zkUrlProp) + else + kafkaConfig.zkConnect + zkClient = new ZkClient(zkUrl, kafkaConfig.zkConnectionTimeoutMs, kafkaConfig.zkConnectionTimeoutMs, ZKStringSerializer) + zkClient.subscribeDataChanges(aclChangedZkPath, ZkListener) + + if(!ZkUtils.pathExists(zkClient, aclZkPath)) { + ZkUtils.createPersistentPath(zkClient, aclZkPath) + } + + //we still invalidate the cache every hour in case we missed any watch notifications due to re-connections. + scheduler.startup() + scheduler.schedule("sync-acls", syncAcls, delay = 0l, period = 1l, unit = TimeUnit.HOURS) + } + + override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { + if (session == null || session.principal == null || session.host == null) { + debug("session, session.principal and session.host can not be null, programming error so failing open.") + return true + } + + val principal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, session.principal.getName) + val remoteAddress: String = session.host + + trace("principal = %s , session = %s resource = %s operation = %s".format(principal, session, resource, operation)) + + if (superUsers.contains(principal)) { + debug("principal = %s is a super user, allowing operation without checking acls.".format(principal)) + return true + } + + if (resource == null || resource.resourceType == null || resource.name == null || resource.name.isEmpty) { + debug("resource is null or empty for operation = %s , session = %s. programming error so failing open.".format(operation, session)) + return true + } + + val acls: Set[Acl] = getAcls(resource) + if (acls == null || acls.isEmpty) { + val authorized: Boolean = if(authorizerConfig.containsKey(allowEveryoneProp)) + authorizerConfig.getProperty(allowEveryoneProp).toBoolean + else + false + + debug("No acl found for resource %s , authorized = %s".format(resource, authorized)) + return authorized + } + + /** + * if you are allowed to read or write we allow describe by default + */ + val ops: Set[Operation] = if(Operation.DESCRIBE.equals(operation)) { + Set[Operation] (operation, Operation.READ , Operation.WRITE) + } else { + Set[Operation](operation) + } + + //first check if there is any Deny acl match that would disallow this operation. + if(aclMatch(session, Set(operation), resource, principal, remoteAddress, PermissionType.DENY, acls)) + return false + + + //now check if there is any allow acl that will allow this operation. + if(aclMatch(session, ops, resource, principal, remoteAddress, PermissionType.ALLOW, acls)) + return true + + //We have some acls defined and they do not specify any allow ACL for the current session, reject request. + debug("session = %s is not allowed to perform operation = %s on resource = %s".format(session, operation, resource)) + false + } + + private def aclMatch(session: Session, operation: Set[Operation], resource: Resource, principal: KafkaPrincipal, remoteAddress: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = { + for (acl: Acl <- acls) { + if (acl.permissionType.equals(permissionType) + && (acl.principals.contains(principal) || acl.principals.contains(Acl.wildCardPrincipal)) + && (acl.operations.intersect(operation).nonEmpty || acl.operations.contains(Operation.ALL)) + && (acl.hosts.contains(remoteAddress) || acl.hosts.contains(Acl.wildCardHost))) { + debug("operation = %s on resource = %s to session = %s is %s based on acl = %s".format(operation, resource, session, permissionType.name(), acl)) + return true + } + } + false + } + + def syncAcls(): Unit = { + debug("Syncing the acl cache for all ") + var resources: collection.Set[Resource] = Set.empty + inReadLock(lock) { + resources = aclCache.keySet + } + + for (resource: Resource <- resources) { + val resourceAcls: Set[Acl] = getAcls(resource) + inWriteLock(lock) { + aclCache.put(resource, resourceAcls) + } + } + } + + override def addAcls(acls: Set[Acl], resource: Resource): Unit = { + if(acls == null || acls.isEmpty) { + return + } + + val updatedAcls: Set[Acl] = getAcls(resource) ++ acls + val path: String = toResourcePath(resource) + + if(ZkUtils.pathExists(zkClient, path)) { + ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) + } else { + ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) + } + + inWriteLock(lock) { + aclCache.put(resource, updatedAcls) + } + + updateAclChangedFlag(resource) + } + + override def removeAcls(acls: Set[Acl], resource: Resource): Boolean = { + if(!ZkUtils.pathExists(zkClient, toResourcePath(resource))) { + return false + } + + val existingAcls: Set[Acl] = getAcls(resource) + val filteredAcls: Set[Acl] = existingAcls.filter((acl: Acl) => !acls.contains(acl)) + if(existingAcls.equals(filteredAcls)) { + return false + } + + val path: String = toResourcePath(resource) + ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls))) + + inWriteLock(lock) { + aclCache.put(resource, filteredAcls) + } + + updateAclChangedFlag(resource) + + true + } + + override def removeAcls(resource: Resource): Boolean = { + if(ZkUtils.pathExists(zkClient, toResourcePath(resource))) { + ZkUtils.deletePath(zkClient, toResourcePath(resource)) + inWriteLock(lock) { + aclCache.remove(resource) + } + updateAclChangedFlag(resource) + return true + } + false + } + + override def getAcls(resource: Resource): Set[Acl] = { + inReadLock(lock) { + if(aclCache.contains(resource)) { + return aclCache.get(resource).get + } + } + + val aclJson: Option[String] = ZkUtils.readDataMaybeNull(zkClient, toResourcePath(resource))._1 + val acls: Set[Acl] = aclJson map ((x: String) => Acl.fromJson(x)) getOrElse Set.empty + + inWriteLock(lock) { + aclCache.put(resource, acls) + } + acls + } + + override def getAcls(principal: KafkaPrincipal): Set[Acl] = { + val resources: collection.Set[Resource] = aclCache.keySet + val acls: scala.collection.mutable.HashSet[Acl] = new scala.collection.mutable.HashSet[Acl] + for(resource: Resource <- resources) { + val resourceAcls: Set[Acl] = getAcls(resource) + acls ++= resourceAcls.filter((acl: Acl) => acl.principals.contains(principal)) + } + acls.toSet + } + + def toResourcePath(resource: Resource) : String = { + aclZkPath + "/" + resource.resourceType + "/" + resource.name + } + + def updateAclChangedFlag(resource: Resource): Unit = { + if(ZkUtils.pathExists(zkClient, aclChangedZkPath)) { + ZkUtils.updatePersistentPath(zkClient, aclChangedZkPath, resource.toString) + } else { + ZkUtils.createPersistentPath(zkClient, aclChangedZkPath, resource.toString) + } + } + + object ZkListener extends IZkDataListener { + override def handleDataChange(dataPath: String, data: Object): Unit = { + val resource: Resource = Resource.fromString(data.toString) + //invalidate the cache entry + inWriteLock(lock) { + aclCache.remove(resource) + } + + //repopulate the cache which is a side effect of calling getAcls with a resource that is not part of cache. + getAcls(resource) + } + + override def handleDataDeleted(dataPath: String): Unit = { + //no op. + } + } +} diff --git a/core/src/test/resources/authorizer-config.properties b/core/src/test/resources/authorizer-config.properties new file mode 100644 index 0000000..d5764e4 --- /dev/null +++ b/core/src/test/resources/authorizer-config.properties @@ -0,0 +1 @@ +allow.everyone=true \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala new file mode 100644 index 0000000..fd9800a --- /dev/null +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.security.auth + +import java.security.Principal +import java.util.UUID + +import com.sun.security.auth.UserPrincipal +import kafka.network.RequestChannel.Session +import kafka.security.auth.{KafkaPrincipal, _} +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.junit.Assert._ +import org.scalatest.junit.JUnit3Suite + + +class SimpleAclAuthorizerTest extends JUnit3Suite with ZooKeeperTestHarness { + + val simpleAclAuthorizer: SimpleAclAuthorizer = new SimpleAclAuthorizer + val testPrincipal: Principal = Acl.wildCardPrincipal + val testHostName: String = "test.host.com" + var session: Session = new Session(testPrincipal, testHostName) + var resource: Resource = null + val superUsers: String = "user:superuser1, user:superuser2" + val username:String = "alice" + + override def setUp() { + super.setUp() + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(KafkaConfig.SuperUserProp, superUsers) + + val cfg = KafkaConfig.fromProps(props) + simpleAclAuthorizer.initialize(cfg) + resource = new Resource(ResourceType.TOPIC, UUID.randomUUID().toString) + } + + def testTopicAcl(): Unit = { + val user1: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, username) + val user2: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, "bob") + val user3: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, "batman") + val host1: String = "host1" + val host2: String = "host2" + + //user1 has READ access from host1 and host2. + val acl1: Acl = new Acl(Set(user1), PermissionType.ALLOW, Set[String](host1, host2), Set[Operation](Operation.READ)) + + //user1 does not have READ access from host1. + val acl2: Acl = new Acl(Set(user1), PermissionType.DENY, Set[String](host1), Set[Operation](Operation.READ)) + + //user1 has Write access from host1 only. + val acl3: Acl = new Acl(Set(user1), PermissionType.ALLOW, Set[String](host1), Set[Operation](Operation.WRITE)) + + //user1 has DESCRIBE access from all hosts. + val acl4: Acl = new Acl(Set(user1), PermissionType.ALLOW, Set[String](Acl.wildCardHost), Set[Operation](Operation.DESCRIBE)) + + //user2 has READ access from all hosts. + val acl5: Acl = new Acl(Set(user2), PermissionType.ALLOW, Set[String](Acl.wildCardHost), Set[Operation](Operation.READ)) + + //user3 has WRITE access from all hosts. + val acl6: Acl = new Acl(Set(user3), PermissionType.ALLOW, Set[String](Acl.wildCardHost), Set[Operation](Operation.WRITE)) + + simpleAclAuthorizer.addAcls(Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6), resource) + + val host1Session: Session = new Session(user1, host1) + val host2Session: Session = new Session(user1, host2) + + assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session, Operation.READ, resource)) + assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session, Operation.READ, resource)) + assertTrue("User1 should have WRITE access from host1", simpleAclAuthorizer.authorize(host1Session, Operation.WRITE, resource)) + assertFalse("User1 should not have WRITE access from host2 as no allow acl is defined", simpleAclAuthorizer.authorize(host2Session, Operation.WRITE, resource)) + assertTrue("User1 should not have DESCRIBE access from host1", simpleAclAuthorizer.authorize(host1Session, Operation.DESCRIBE, resource)) + assertTrue("User1 should have DESCRIBE access from host2", simpleAclAuthorizer.authorize(host2Session, Operation.DESCRIBE, resource)) + assertFalse("User1 should not have edit access from host1", simpleAclAuthorizer.authorize(host1Session, Operation.ALTER, resource)) + assertFalse("User1 should not have edit access from host2", simpleAclAuthorizer.authorize(host2Session, Operation.ALTER, resource)) + + //test if user has READ and write access they also get describe access + + val user2Session: Session = new Session(user2, host1) + val user3Session: Session = new Session(user3, host1) + assertTrue("User2 should have DESCRIBE access from host1", simpleAclAuthorizer.authorize(user2Session, Operation.DESCRIBE, resource)) + assertTrue("User3 should have DESCRIBE access from host2", simpleAclAuthorizer.authorize(user3Session, Operation.DESCRIBE, resource)) + assertTrue("User2 should have READ access from host1", simpleAclAuthorizer.authorize(user2Session, Operation.READ, resource)) + assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session, Operation.WRITE, resource)) + } + + def testDenyTakesPrecedence(): Unit = { + val user: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, username) + val host: String = "random-host" + val session: Session = new Session(user, host) + + val allowAll: Acl = Acl.allowAllAcl + val denyAcl: Acl = new Acl(Set(user), PermissionType.DENY, Set[String](host), Set[Operation](Operation.ALL)) + simpleAclAuthorizer.addAcls(Set[Acl](allowAll, denyAcl), resource) + + assertFalse("deny should take precedence over allow.", simpleAclAuthorizer.authorize(session, Operation.READ, resource)) + } + + def testAllowAllAccess(): Unit = { + val allowAllAcl: Acl = Acl.allowAllAcl + simpleAclAuthorizer.addAcls(Set[Acl](allowAllAcl), resource) + + val session: Session = new Session(new UserPrincipal("random"), "random.host") + assertTrue("allow all acl should allow access to all.", simpleAclAuthorizer.authorize(session, Operation.READ, resource)) + } + + def testSuperUserHasAccess(): Unit = { + val denyAllAcl: Acl = new Acl(Set(Acl.wildCardPrincipal), PermissionType.DENY, Set[String](Acl.wildCardHost), Set[Operation](Operation.ALL)) + simpleAclAuthorizer.addAcls(Set[Acl](denyAllAcl), resource) + + val session1: Session = new Session(new UserPrincipal("superuser1"), "random.host") + val session2: Session = new Session(new UserPrincipal("superuser2"), "random.host") + + assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session1, Operation.READ, resource)) + assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Operation.READ, resource)) + } + + + def testNoAclFound(): Unit = { + assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Operation.READ, resource)) + } + + def testNoAclFoundOverride(): Unit = { + val props = TestUtils.createBrokerConfig(1, zkConnect) + props.put(KafkaConfig.AuthorizerConfigPathProp, Thread.currentThread().getContextClassLoader.getResource("authorizer-config.properties").getPath) + + val cfg = KafkaConfig.fromProps(props) + val testAuthoizer: SimpleAclAuthorizer = new SimpleAclAuthorizer + testAuthoizer.initialize(cfg) + assertTrue("when acls = null or [], authorizer should fail open with allow.everyone = true.", testAuthoizer.authorize(session, Operation.READ, resource)) + } + + def testFailOpenOnProgrammingErrors(): Unit = { + assertTrue("null session should fail open.", simpleAclAuthorizer.authorize(null, Operation.READ, resource)) + assertTrue("null principal should fail open.", simpleAclAuthorizer.authorize(new Session(null, testHostName), Operation.READ, resource)) + assertTrue("null host should fail open.", simpleAclAuthorizer.authorize(new Session(testPrincipal, null), Operation.READ, resource)) + } + + def testAclManagementAPIs(): Unit = { + val user1: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, username) + val user2: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.userType, "bob") + val host1: String = "host1" + val host2: String = "host2" + + val acl1: Acl = new Acl(Set(user1, user2), PermissionType.ALLOW, Set[String](host1, host2), Set[Operation](Operation.READ, Operation.WRITE)) + simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource) + assertEquals(Set(acl1), simpleAclAuthorizer.getAcls(resource)) + + //test addAcl is additive + val acl2: Acl = new Acl(Set(user2), PermissionType.ALLOW, Set[String](Acl.wildCardHost), Set[Operation](Operation.READ)) + simpleAclAuthorizer.addAcls(Set[Acl](acl2), resource) + assertEquals(Set(acl1,acl2), simpleAclAuthorizer.getAcls(resource)) + + //Following assertions fails transiently due to consistency issues. + //test remove a single acl from existing acls. + val acl3: Acl = new Acl(Set(user2), PermissionType.ALLOW, Set[String](Acl.wildCardHost), Set[Operation](Operation.READ)) + simpleAclAuthorizer.removeAcls(Set(acl3), resource) + assertEquals(Set(acl1), simpleAclAuthorizer.getAcls(resource)) + + //test remove all acls for resource + simpleAclAuthorizer.removeAcls( resource) + assertTrue(simpleAclAuthorizer.getAcls(resource).isEmpty) + } +} -- 2.1.3.36.g8e36a6d