From 20106eff55ecc2f370e4f3e64f5985913cab5f5a Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Wed, 20 May 2015 12:03:15 -0700 Subject: [PATCH 1/3] KAFKA-2210: Kafka authorizer public entities and changes to KafkaAPI and KafkaServer to allow custom authorizer implementation. --- .../kafka/common/AuthorizationException.scala | 25 ++++ .../src/main/scala/kafka/common/ErrorMapping.scala | 8 +- core/src/main/scala/kafka/security/auth/Acl.scala | 117 ++++++++++++++++ .../scala/kafka/security/auth/Authorizer.scala | 83 +++++++++++ .../scala/kafka/security/auth/KafkaPrincipal.scala | 66 +++++++++ .../main/scala/kafka/security/auth/Operation.java | 45 ++++++ .../scala/kafka/security/auth/PermissionType.java | 22 +++ .../main/scala/kafka/security/auth/Resource.scala | 60 ++++++++ .../scala/kafka/security/auth/ResourceType.java | 40 ++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 156 +++++++++++++++++---- core/src/main/scala/kafka/server/KafkaConfig.scala | 43 +++++- core/src/main/scala/kafka/server/KafkaServer.scala | 44 +++--- core/src/test/resources/acl.json | 39 ++++++ .../scala/unit/kafka/security/auth/AclTest.scala | 70 +++++++++ .../kafka/security/auth/KafkaPrincipalTest.scala | 43 ++++++ .../unit/kafka/security/auth/ResourceTest.scala | 43 ++++++ .../kafka/server/KafkaConfigConfigDefTest.scala | 5 + 17 files changed, 857 insertions(+), 52 deletions(-) create mode 100644 core/src/main/scala/kafka/common/AuthorizationException.scala create mode 100644 core/src/main/scala/kafka/security/auth/Acl.scala create mode 100644 core/src/main/scala/kafka/security/auth/Authorizer.scala create mode 100644 core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala create mode 100644 core/src/main/scala/kafka/security/auth/Operation.java create mode 100644 core/src/main/scala/kafka/security/auth/PermissionType.java create mode 100644 core/src/main/scala/kafka/security/auth/Resource.scala create mode 100644 core/src/main/scala/kafka/security/auth/ResourceType.java create mode 100644 core/src/test/resources/acl.json create mode 100644 core/src/test/scala/unit/kafka/security/auth/AclTest.scala create mode 100644 core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala create mode 100644 core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala new file mode 100644 index 0000000..12ee0fe --- /dev/null +++ b/core/src/main/scala/kafka/common/AuthorizationException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.common + +/** + * Exception thrown when a principal is not authorized to perform an operation. + * @param message + */ +class AuthorizationException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index c75c685..5ad3ae5 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -17,8 +17,10 @@ package kafka.common -import kafka.message.InvalidMessageException import java.nio.ByteBuffer + +import kafka.message.InvalidMessageException + import scala.Predef._ /** @@ -51,6 +53,7 @@ object ErrorMapping { val NotEnoughReplicasAfterAppendCode: Short = 20 // 21: InvalidRequiredAcks // 22: IllegalConsumerGeneration + val AuthorizationCode: Short = 24; private val exceptionToCode = Map[Class[Throwable], Short]( @@ -72,7 +75,8 @@ object ErrorMapping { classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode, classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode, - classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode + classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode, + classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala new file mode 100644 index 0000000..c062509 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.auth + +import kafka.utils.Json + +object Acl { + val wildCardPrincipal: KafkaPrincipal = new KafkaPrincipal("user", "*") + val wildCardHost: String = "*" + val allowAllAcl = new Acl(Set[KafkaPrincipal](wildCardPrincipal), PermissionType.ALLOW, Set[String](wildCardHost), Set[Operation](Operation.ALL)) + val PRINCIPAL_KEY = "principal" + val PERMISSION_TYPE_KEY = "permissionType" + val OPERATIONS_KEY = "operations" + val HOSTS_KEY = "hosts" + val VERSION_KEY = "version" + val CURRENT_VERSION = 1 + val ACLS_KEY = "acls" + + def fromJson(aclJson: String): Set[Acl] = { + if(aclJson == null || aclJson.isEmpty) { + return collection.immutable.Set.empty[Acl] + } + var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]() + Json.parseFull(aclJson) match { + case Some(m) => + val aclMap = m.asInstanceOf[Map[String, Any]] + //the acl json version. + require(aclMap.get(VERSION_KEY).get == CURRENT_VERSION) + val aclSet: List[Map[String, Any]] = aclMap.get(ACLS_KEY).get.asInstanceOf[List[Map[String, Any]]] + aclSet.foreach(item => { + val principals: List[KafkaPrincipal] = item(PRINCIPAL_KEY).asInstanceOf[List[String]].map(principal => KafkaPrincipal.fromString(principal)) + val permissionType: PermissionType = PermissionType.valueOf(item(PERMISSION_TYPE_KEY).asInstanceOf[String]) + val operations: List[Operation] = item(OPERATIONS_KEY).asInstanceOf[List[String]].map(operation => Operation.fromString(operation)) + val hosts: List[String] = item(HOSTS_KEY).asInstanceOf[List[String]] + acls += new Acl(principals.toSet, permissionType, hosts.toSet, operations.toSet) + }) + case None => + } + return acls.toSet + } + + def toJsonCompatibleMap(acls: Set[Acl]): Map[String,Any] = { + acls match { + case aclSet: Set[Acl] => Map(Acl.VERSION_KEY -> Acl.CURRENT_VERSION, Acl.ACLS_KEY -> aclSet.map(acl => acl.toMap).toList) + case _ => null + } + } +} + +/** + * An instance of this class will represent an acl that can express following statement. + *
+ * Principal P has permissionType PT on Operations O1,O2 from hosts H1,H2.
+ * 
+ * @param principals A value of *:* indicates all users. + * @param permissionType + * @param hosts A value of * indicates all hosts. + * @param operations A value of ALL indicates all operations. + */ +class Acl(val principals: Set[KafkaPrincipal],val permissionType: PermissionType,val hosts: Set[String],val operations: Set[Operation]) { + + /** + * TODO: Ideally we would have a symmetric toJson method but our current json library fails to decode double parsed json strings so + * convert to map which then gets converted to json. + * Convert an acl instance to a map + * @return Map representation of the Acl. + */ + def toMap() : Map[String, Any] = { + val map: collection.mutable.HashMap[String, Any] = new collection.mutable.HashMap[String, Any]() + map.put(Acl.PRINCIPAL_KEY, principals.map(principal => principal.toString)) + map.put(Acl.PERMISSION_TYPE_KEY, permissionType.name()) + map.put(Acl.OPERATIONS_KEY, operations.map(operation => operation.name())) + map.put(Acl.HOSTS_KEY, hosts) + + map.toMap + } + + override def equals(that: Any): Boolean = { + if(!(that.isInstanceOf[Acl])) + return false + val other = that.asInstanceOf[Acl] + if(permissionType.equals(other.permissionType) && operations.equals(other.operations) && principals.equals(other.principals) + && hosts.map(host => host.toLowerCase()).equals(other.hosts.map(host=> host.toLowerCase()))) { + return true + } + false + } + + + override def hashCode(): Int = { + 31 + + principals.foldLeft(0)((r: Int, c: KafkaPrincipal) => r + c.hashCode()) + + operations.foldLeft(0)((r: Int, c: Operation) => r + c.hashCode()) + + hosts.foldLeft(0)((r: Int, c: String) => r + c.toLowerCase().hashCode()) + } + + override def toString() : String = { + return "%s has %s permission for operations: %s from hosts: %s".format(principals.mkString(","), permissionType.name(), operations.mkString(","), hosts.mkString(",")) + } + +} + diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala new file mode 100644 index 0000000..72ab803 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.auth + +import kafka.network.RequestChannel.Session +import kafka.server.KafkaConfig + +/** + * Top level interface that all plugable authorizer must implement. Kafka server will read "authorizer.class" config + * value at startup time, create an instance of the specified class and call initialize method. + * authorizer.class must be a class that implements this interface. + * If authorizer.class has no value specified no authorization will be performed. + * + * From that point onwards, every client request will first be routed to authorize method and the request will only be + * authorized if the method returns true. + */ +trait Authorizer { + /** + * Guaranteed to be called before any authorize call is made. + */ + def initialize(kafkaConfig: KafkaConfig): Unit + + /** + * @param session The session being authenticated. + * @param operation Type of operation client is trying to perform on resource. + * @param resource Resource the client is trying to access. + * @return + */ + def authorize(session: Session, operation: Operation, resource: Resource): Boolean + + /** + * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new + * acls will be added to existing acls. + * @param acls set of acls to add to existing acls + * @param resource the resource to which these acls should be attached. + */ + def addAcls(acls: Set[Acl], resource: Resource): Unit + + /** + * remove these acls from the resource. + * @param acls set of acls to be removed. + * @param resource resource from which the acls should be removed. + * @return true if some acl got removed, false if no acl was removed. + */ + def removeAcls(acls: Set[Acl], resource: Resource): Boolean + + /** + * remove a resource along with all of its acls from acl store. + * @param resource + * @return + */ + def removeAcls(resource: Resource): Boolean + + /** + * get set of acls for this resource + * @param resource + * @return empty set if no acls are found, otherwise the acls for the resource. + */ + def getAcls(resource: Resource): Set[Acl] + + /** + * get the acls for this principal. + * @param principal + * @return empty set if no acls exist for this principal, otherwise the acls for the principal. + */ + def getAcls(principal: KafkaPrincipal): Set[Acl] +} + diff --git a/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala b/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala new file mode 100644 index 0000000..21f7d44 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth + +import java.security.Principal + +object KafkaPrincipal { + val seperator: String = ":" + val userType: String = "User" + + def fromString(str: String) : KafkaPrincipal = { + val arr: Array[String] = str.split(seperator) + + if(arr.length != 2) { + throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str) + } + + new KafkaPrincipal(arr(0), arr(1)) + } +} + +/** + * + * @param principalType type of principal user,unixgroup, ldapgroup. + * @param name name of the principal + */ +class KafkaPrincipal(val principalType: String,val name: String) extends Principal { + + override def getName: String = { + name + } + + override def toString: String = { + principalType + KafkaPrincipal.seperator + name + } + + override def equals(that: Any): Boolean = { + if(!(that.isInstanceOf[KafkaPrincipal])) + return false + val other: KafkaPrincipal = that.asInstanceOf[KafkaPrincipal] + if(principalType.equalsIgnoreCase(other.principalType) && name.equalsIgnoreCase(other.name)) + return true + false + } + + override def hashCode(): Int = { + 31 + principalType.toLowerCase.hashCode + name.toLowerCase.hashCode + } +} + + + diff --git a/core/src/main/scala/kafka/security/auth/Operation.java b/core/src/main/scala/kafka/security/auth/Operation.java new file mode 100644 index 0000000..517c5e0 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/Operation.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth; + +/** + * Different operations a client may perform on kafka resources. + */ +public enum Operation { + READ, + WRITE, + CREATE, + DELETE, + ALTER, + DESCRIBE, + CLUSTER_ACTION, + ALL; + + /** + * method defined for case insensitive check. the default valueOf() method is case sensitive + */ + public static Operation fromString(String operationName) { + if(operationName != null) { + for(Operation operation: Operation.values()) { + if(operationName.equalsIgnoreCase(operation.name())) { + return operation; + } + } + } + throw new IllegalArgumentException("no matching enum value found for " + operationName); + } +} diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.java b/core/src/main/scala/kafka/security/auth/PermissionType.java new file mode 100644 index 0000000..b844d41 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/PermissionType.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth; + +public enum PermissionType { + ALLOW, + DENY +} diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala new file mode 100644 index 0000000..37fd88b --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth + +object Resource { + val separator: String = ":" + val clusterResourceName: String = "kafka-cluster" + val clusterResource: Resource = new Resource(ResourceType.CLUSTER,Resource.clusterResourceName) + + def fromString(str: String) : Resource = { + val arr: Array[String] = str.split(separator) + + if(arr.length != 2) { + throw new IllegalArgumentException("expected a string in format resourceType:name but got " + str + ".allowed resource types are" + ResourceType.values()) + } + + new Resource(ResourceType.fromString(arr(0)), arr(1)) + } +} + +/** + * + * @param resourceType type of resource. + * @param name name of the resource, for topic this will be topic name , for group it will be group name. For cluster type + * it will be a constant string kafka-cluster. + */ +class Resource(val resourceType: ResourceType,val name: String) { + + override def toString: String = { + resourceType.name() + Resource.separator + name + } + + override def equals(that: Any): Boolean = { + if(!(that.isInstanceOf[Resource])) + return false + val other: Resource = that.asInstanceOf[Resource] + if(resourceType.equals(other.resourceType) && name.equalsIgnoreCase(other.name)) + return true + false + } + + override def hashCode(): Int = { + 31 + resourceType.hashCode + name.toLowerCase.hashCode + } +} + diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.java b/core/src/main/scala/kafka/security/auth/ResourceType.java new file mode 100644 index 0000000..70ed1a6 --- /dev/null +++ b/core/src/main/scala/kafka/security/auth/ResourceType.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.auth; + +/** + * ResourceTypes. + */ +public enum ResourceType { + CLUSTER, + TOPIC, + CONSUMER_GROUP; + + /** + * method defined for case insensitive check. the default valueOf() method is case sensitive + */ + public static ResourceType fromString(String resourceType) { + if(resourceType != null) { + for(ResourceType rType: ResourceType.values()) { + if(resourceType.equalsIgnoreCase(rType.name())) { + return rType; + } + } + } + throw new IllegalArgumentException("no matching enum value found for " + resourceType); + } +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 387e387..af7630d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,23 +17,24 @@ package kafka.server -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} -import org.apache.kafka.common.TopicPartition -import kafka.api._ import kafka.admin.AdminUtils +import kafka.api._ import kafka.common._ import kafka.controller.KafkaController import kafka.coordinator.ConsumerCoordinator import kafka.log._ -import kafka.network._ +import kafka.message.MessageSet import kafka.network.RequestChannel.Response -import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} +import kafka.network._ +import kafka.security.auth.{Authorizer, Operation, Resource, ResourceType} +import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, ResponseHeader} import scala.collection._ -import org.I0Itec.zkclient.ZkClient - /** * Logic to handle the various Kafka requests */ @@ -45,7 +46,8 @@ class KafkaApis(val requestChannel: RequestChannel, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, - val metadataCache: MetadataCache) extends Logging { + val metadataCache: MetadataCache, + val authorizer: Option[Authorizer]) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) @@ -96,6 +98,13 @@ class KafkaApis(val requestChannel: RequestChannel, // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] + + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, Map.empty, ErrorMapping.AuthorizationCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) + return + } + try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) @@ -112,6 +121,13 @@ class KafkaApis(val requestChannel: RequestChannel, // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] + + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, Map.empty, ErrorMapping.AuthorizationCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + return + } + val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) @@ -120,6 +136,18 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] + + if(authorizer.isDefined) { + val unauthorizedTopicAndPartition = updateMetadataRequest.partitionStateInfos.filterKeys( + topicAndPartition => !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)).keys + //In this case the response does not allow to selectively report success/failure so if authorization fails, we fail the entire request. + if (unauthorizedTopicAndPartition.nonEmpty) { + val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId, ErrorMapping.AuthorizationCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) + return + } + } + replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) @@ -131,6 +159,13 @@ class KafkaApis(val requestChannel: RequestChannel, // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] + + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { + val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.AuthorizationCode, Set.empty) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) + return + } + val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) @@ -144,26 +179,32 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetCommitRequest.requestInfo.partition( + mapEntry => !authorizer.isDefined || (authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC,mapEntry._1.topic)) && + authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP,offsetCommitRequest.groupId)))) + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { - commitStatus.foreach { case (topicAndPartition, errorCode) => + val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode) + + mergedCommitStatus.foreach { case (topicAndPartition, errorCode) => // we only print warnings for known errors here; only replica manager could see an unknown // exception while trying to write the offset message to the local log, and it will log // an error message and write the error code in this case; hence it can be ignored here if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, - topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) + topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) } } - val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) + val response = OffsetCommitResponse(mergedCommitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } if (offsetCommitRequest.versionId == 0) { // for version 0 always store offsets to ZK - val responseInfo = offsetCommitRequest.requestInfo.map { + val responseInfo = authorizedRequestInfo.map { case (topicAndPartition, metaAndError) => { val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) try { @@ -205,7 +246,7 @@ class KafkaApis(val requestChannel: RequestChannel, val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => + val offsetData = authorizedRequestInfo.mapValues(offsetAndMetadata => offsetAndMetadata.copy( commitTimestamp = currentTimestamp, expireTimestamp = { @@ -233,10 +274,15 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition( + mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.WRITE, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false - responseStatus.foreach { case (topicAndPartition, status) => + val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1)) + + mergedResponseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { @@ -259,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.noOperation(request.processor, request) } } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) + val response = ProducerResponse(produceRequest.correlationId, mergedResponseStatus) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } } @@ -273,7 +319,7 @@ class KafkaApis(val requestChannel: RequestChannel, produceRequest.ackTimeoutMs.toLong, produceRequest.requiredAcks, internalTopicsAllowed, - produceRequest.data, + authorizedRequestInfo, sendResponseCallback) // if the request is put into the purgatory, it will have a held reference @@ -288,9 +334,16 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] + val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition( + mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty)) + // the callback for sending a fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { - responsePartitionData.foreach { case (topicAndPartition, data) => + val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus + + mergedResponseStatus.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { @@ -313,7 +366,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, - fetchRequest.requestInfo, + authorizedRequestInfo, sendResponseCallback) } @@ -322,7 +375,13 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] - val responseMap = offsetRequest.requestInfo.map(elem => { + + val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition( + mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil)) + + val responseMap = authorizedRequestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem try { // ensure leader exists @@ -362,7 +421,9 @@ class KafkaApis(val requestChannel: RequestChannel, (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } }) - val response = OffsetResponse(offsetRequest.correlationId, responseMap) + + val mergedResponseMap = responseMap ++ unauthorizedResponseStatus + val response = OffsetResponse(offsetRequest.correlationId, mergedResponseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -462,22 +523,35 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol) + val topics = metadataRequest.topics.toSet + + val (authorizedTopics, unauthorizedTopics) = topics.partition(topic => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,topic))) + + val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode)) + + val topicMetadata = getTopicMetadata(authorizedTopics, request.securityProtocol) val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId) + val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata ++ unauthorizedTopicMetaData, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } /* * Handle an offset fetch request */ + def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] + val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition( + topicAndPartition => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,topicAndPartition.topic))) + + val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode) + val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap + val response = if (offsetFetchRequest.versionId == 0) { // version 0 reads offsets from ZK - val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => { + val responseInfo = authorizedTopicPartitions.map( topicAndPartition => { val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic) try { if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) { @@ -496,10 +570,12 @@ class KafkaApis(val requestChannel: RequestChannel, } }) - OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) + val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition => + (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode))) + OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId) } else { // version 1 reads offsets from Kafka - val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => + val (unknownTopicPartitions, knownTopicPartitions) = authorizedTopicPartitions.partition(topicAndPartition => metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap @@ -508,7 +584,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap else Map.empty[TopicAndPartition, OffsetMetadataAndError] - val status = unknownStatus ++ knownStatus + val status = unknownStatus ++ knownStatus ++ unauthorizedStatus OffsetFetchResponse(status, offsetFetchRequest.correlationId) } @@ -526,6 +602,13 @@ class KafkaApis(val requestChannel: RequestChannel, val partition = offsetManager.partitionFor(consumerMetadataRequest.group) + //TODO: this can in turn create the topic, so we should check the create permissions if the config is enabled and topic is non existent. + if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC, OffsetManager.OffsetsTopicName))) { + val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(errorResponse))) + return + } + // get metadata (and create the topic if necessary) val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head @@ -549,10 +632,17 @@ class KafkaApis(val requestChannel: RequestChannel, val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] val respHeader = new ResponseHeader(request.header.correlationId) + val (authorizedTopics, unauthorizedTopics) = joinGroupRequest.topics().partition( + topic => (!authorizer.isDefined || authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC, topic)) + && authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP, joinGroupRequest.groupId())))) + + val unauthorizedTopicPartition = unauthorizedTopics.map(topic => new TopicPartition(topic, -1)) + // the callback for sending a join-group response def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) { - val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList) + val partitionList = (partitions.map(tp => new TopicPartition(tp.topic, tp.partition)) ++ unauthorizedTopicPartition).toBuffer + val error = if (errorCode == ErrorMapping.NoError && unauthorizedTopicPartition.nonEmpty) ErrorMapping.AuthorizationCode else errorCode + val responseBody = new JoinGroupResponse(error, generationId, consumerId, partitionList) trace("Sending join group response %s for correlation id %d to client %s." .format(responseBody, request.header.correlationId, request.header.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) @@ -562,7 +652,7 @@ class KafkaApis(val requestChannel: RequestChannel, coordinator.handleJoinGroup( joinGroupRequest.groupId(), joinGroupRequest.consumerId(), - joinGroupRequest.topics().toSet, + authorizedTopics.toSet, joinGroupRequest.sessionTimeout(), joinGroupRequest.strategy(), sendResponseCallback) @@ -572,6 +662,12 @@ class KafkaApis(val requestChannel: RequestChannel, val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] val respHeader = new ResponseHeader(request.header.correlationId) + if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP, heartbeatRequest.groupId()))) { + val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, heartbeatResponse))) + return + } + // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { val responseBody = new HeartbeatResponse(errorCode) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6f25afd..ee0c820 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -42,6 +42,12 @@ object Defaults { val BackgroundThreads = 10 val QueuedMaxRequests = 500 + /************* Authorizer Configuration ***********/ + val AuthorizerClassName = "" + val SuperUser = "" + val PrincipalToLocal = "" + val AuthorizerConfigPath = "" + /** ********* Socket Server Configuration ***********/ val Port = 9092 val HostName: String = new String("") @@ -147,6 +153,11 @@ object KafkaConfig { val NumIoThreadsProp = "num.io.threads" val BackgroundThreadsProp = "background.threads" val QueuedMaxRequestsProp = "queued.max.requests" + /************* Authorizer Configuration ***********/ + val AuthorizerClassNameProp = "authorizer.class.name" + val SuperUserProp = "super.users" + val PrincipalToLocalProp = "principal.to.local.class" + val AuthorizerConfigPathProp = "authorizer.config.path" /** ********* Socket Server Configuration ***********/ val PortProp = "port" val HostNameProp = "host.name" @@ -257,6 +268,11 @@ object KafkaConfig { val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests" val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks" val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads" + /************* Authorizer Configuration ***********/ + val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization" + val SuperUserDoc = "Comma seperated list of users that will have super user access to the cluster and all the topics." + val PrincipalToLocalDoc = "Name of the class that converts a principal to local user." + val AuthorizerConfigPathDoc = "Path to a authorizer configuration property file that will be used by the authorizer implementation." /** ********* Socket Server Configuration ***********/ val PortDoc = "the port to listen and accept connections on" val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces" @@ -372,8 +388,7 @@ object KafkaConfig { val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + - "'producer' which means retain the original compression codec set by the producer." - + "'producer' which means retain the original compression codec set by the producer." private val configDef = { import ConfigDef.Range._ @@ -398,6 +413,12 @@ object KafkaConfig { .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc) .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc) + /************* Authorizer Configuration ***********/ + .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) + .define(SuperUserProp, STRING, Defaults.SuperUser, LOW, SuperUserDoc) + .define(PrincipalToLocalProp, STRING, Defaults.PrincipalToLocal, LOW, PrincipalToLocalDoc) + .define(AuthorizerConfigPathProp, STRING, Defaults.AuthorizerConfigPath, LOW, AuthorizerConfigPathDoc) + /** ********* Socket Server Configuration ***********/ .define(PortProp, INT, Defaults.Port, HIGH, PortDoc) .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc) @@ -523,6 +544,12 @@ object KafkaConfig { backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int], queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int], + /************* Authorizer Configuration ***********/ + authorizerClassName = parsed.get(AuthorizerClassNameProp).asInstanceOf[String], + superUser = parsed.get(SuperUserProp).asInstanceOf[String], + principalToLocal = parsed.get(PrincipalToLocalProp).asInstanceOf[String], + authorizerConfigPath = parsed.get(AuthorizerConfigPathProp).asInstanceOf[String], + /** ********* Socket Server Configuration ***********/ port = parsed.get(PortProp).asInstanceOf[Int], hostName = parsed.get(HostNameProp).asInstanceOf[String], @@ -668,6 +695,12 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val backgroundThreads: Int = Defaults.BackgroundThreads, val queuedMaxRequests: Int = Defaults.QueuedMaxRequests, + /************* Authorizer Configuration ***********/ + val authorizerClassName: String = Defaults.AuthorizerClassName, + val superUser: String = Defaults.SuperUser, + val principalToLocal: String = Defaults.PrincipalToLocal, + val authorizerConfigPath: String = Defaults.AuthorizerConfigPath, + /** ********* Socket Server Configuration ***********/ val port: Int = Defaults.Port, val hostName: String = Defaults.HostName, @@ -894,6 +927,12 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(BackgroundThreadsProp, backgroundThreads.toString) props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString) + /************* Authorizer Configuration ***********/ + props.put(AuthorizerClassNameProp, authorizerClassName.toString) + props.put(SuperUserProp, superUser.toString) + props.put(PrincipalToLocalProp, principalToLocal.toString) + props.put(AuthorizerConfigPathProp, authorizerConfigPath.toString) + /** ********* Socket Server Configuration ***********/ props.put(PortProp, port.toString) props.put(HostNameProp, hostName) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e66710d..436de74 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,25 +17,25 @@ package kafka.server -import kafka.admin._ -import kafka.log.LogConfig -import kafka.log.CleanerConfig -import kafka.log.LogManager -import kafka.utils._ -import java.util.concurrent._ -import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import collection.mutable -import org.I0Itec.zkclient.ZkClient -import kafka.controller.{ControllerStats, KafkaController} -import kafka.cluster.{EndPoint, Broker} -import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} -import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} -import kafka.network.{Receive, BlockingChannel, SocketServer} -import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +import kafka.admin._ +import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} +import kafka.cluster.{Broker, EndPoint} +import kafka.common.{ErrorMapping, GenerateBrokerIdException, InconsistentBrokerIdException} +import kafka.controller.{ControllerStats, KafkaController} import kafka.coordinator.ConsumerCoordinator +import kafka.log.{CleanerConfig, LogConfig, LogManager} +import kafka.metrics.KafkaMetricsGroup +import kafka.network.{BlockingChannel, Receive, SocketServer} +import kafka.security.auth.Authorizer +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + +import scala.collection.mutable /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -71,8 +71,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var kafkaHealthcheck: KafkaHealthcheck = null val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - - var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" @@ -144,9 +142,19 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager) consumerCoordinator.startup() + + /* Get the authorizer and initialize it if one is specified.*/ + val authorizer: Option[Authorizer] = if(config.authorizerClassName != null && !config.authorizerClassName.isEmpty) { + val authZ: Authorizer = CoreUtils.createObject(config.authorizerClassName) + authZ.initialize(config) + Option(authZ) + } else { + None + } + /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache) + kafkaController, zkClient, config.brokerId, config, metadataCache, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) diff --git a/core/src/test/resources/acl.json b/core/src/test/resources/acl.json new file mode 100644 index 0000000..97710b3 --- /dev/null +++ b/core/src/test/resources/acl.json @@ -0,0 +1,39 @@ +{ + "version": 1, + "acls": [ + { + "hosts": [ + "host1", + "host2" + ], + "permissionType": "DENY", + "operations": [ + "READ", + "WRITE" + ], + "principal": ["user:alice", "user:bob"] + }, + { + "hosts": [ + "*" + ], + "permissionType": "ALLOW", + "operations": [ + "READ", + "WRITE" + ], + "principal": ["user:bob"] + }, + { + "hosts": [ + "host1", + "host2" + ], + "permissionType": "DENY", + "operations": [ + "read" + ], + "principal": ["user:bob"] + } + ] +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala new file mode 100644 index 0000000..a48fbce --- /dev/null +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.security.auth + +import kafka.security.auth.{Acl, KafkaPrincipal, Operation, PermissionType} +import kafka.utils.Json +import org.junit.Assert +import org.scalatest.junit.JUnit3Suite + +class AclTest extends JUnit3Suite { + + def testAclJsonConversion(): Unit = { + val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice"), new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl2: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("*"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ)) + + val acls: Set[Acl] = Set[Acl](acl1, acl2, acl3) + val jsonAcls: String = Json.encode(Acl.toJsonCompatibleMap(acls)) + Assert.assertEquals(acls, Acl.fromJson(jsonAcls)) + + //test json by reading from a local file. + val path: String = Thread.currentThread().getContextClassLoader.getResource("acl.json").getPath + val source = scala.io.Source.fromFile(path) + Assert.assertEquals(acls, Acl.fromJson(source.mkString)) + source.close() + } + + def testEqualsAndHashCode(): Unit = { + //check equals is not sensitive to case or order for principal,hosts or operations. + val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob"), new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl2: Acl = new Acl(Set(new KafkaPrincipal("USER", "ALICE"), new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("HOST2", "HOST1"), Set[Operation](Operation.WRITE, Operation.READ)) + + Assert.assertEquals(acl1, acl2) + Assert.assertEquals(acl1.hashCode(), acl2.hashCode()) + + //if user does not match returns false + val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl4: Acl = new Acl(Set(new KafkaPrincipal("USER", "Bob")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + Assert.assertFalse(acl3.equals(acl4)) + + //if permission does not match return false + val acl5: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.DENY, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl6: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + Assert.assertFalse(acl5.equals(acl6)) + + //if hosts do not match return false + val acl7: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host10", "HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl8: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + Assert.assertFalse(acl7.equals(acl8)) + + //if Opoerations do not match return false + val acl9: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl10: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.DESCRIBE)) + Assert.assertFalse(acl9.equals(acl10)) + } +} diff --git a/core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala b/core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala new file mode 100644 index 0000000..d6ecb06 --- /dev/null +++ b/core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.security.auth + +import kafka.security.auth.KafkaPrincipal +import org.junit.Assert +import org.scalatest.junit.JUnit3Suite + +class KafkaPrincipalTest extends JUnit3Suite { + + def testEqualsAndHashCode(): Unit = { + //check equals is not sensitive to case. + val principal1:KafkaPrincipal = KafkaPrincipal.fromString("user:test") + val principal2:KafkaPrincipal = KafkaPrincipal.fromString("USER:TEST") + + Assert.assertEquals(principal1, principal2) + Assert.assertEquals(principal1.hashCode(), principal2.hashCode()) + + //if name does not match returns false + val principal3:KafkaPrincipal = KafkaPrincipal.fromString("user:test") + val principal4:KafkaPrincipal = KafkaPrincipal.fromString("user:test1") + Assert.assertFalse(principal3.equals(principal4)) + + //if type does not match return false + val principal5:KafkaPrincipal = KafkaPrincipal.fromString("user:test") + val principal6:KafkaPrincipal = KafkaPrincipal.fromString("group:test") + Assert.assertFalse(principal5.equals(principal6)) + } +} diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala new file mode 100644 index 0000000..0a21416 --- /dev/null +++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.security.auth + +import kafka.security.auth.Resource +import kafka.security.auth.ResourceType +import org.junit.Assert +import org.scalatest.junit.JUnit3Suite + +class ResourceTest extends JUnit3Suite { + + def testEqualsAndHashCode(): Unit = { + //check equals is not sensitive to case. + val resource1: Resource = Resource.fromString(ResourceType.TOPIC.name().toLowerCase + ":test") + val resource2: Resource = Resource.fromString(ResourceType.TOPIC.name().toUpperCase() + ":TEST") + Assert.assertEquals(resource1, resource2) + Assert.assertEquals(resource1.hashCode(), resource2.hashCode()) + + val resource3: Resource = Resource.fromString(ResourceType.TOPIC.name() + ":test") + val resource4: Resource = Resource.fromString(ResourceType.TOPIC.name() + ":TEST1") + //if name does not match returns false + Assert.assertFalse(resource3.equals(resource4)) + + //if type does not match return false + val resource5: Resource = Resource.fromString(ResourceType.TOPIC.name() + ":test") + val resource6: Resource = Resource.fromString(ResourceType.CONSUMER_GROUP.name() + ":test") + Assert.assertFalse(resource5.equals(resource6)) + } +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 71f48c0..776eeea 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -276,6 +276,11 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.AuthorizerClassNameProp => // ignore string + case KafkaConfig.SuperUserProp => //ignore string + case KafkaConfig.PrincipalToLocalProp => //ignore string + case KafkaConfig.AuthorizerConfigPathProp => //ignore string + case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.HostNameProp => // ignore string case KafkaConfig.AdvertisedHostNameProp => //ignore string -- 2.1.3.36.g8e36a6d From b2015d666a60a737f639a180105439d31346f116 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Wed, 3 Jun 2015 16:32:08 -0700 Subject: [PATCH 2/3] Addressing review comments from Jun. --- core/src/main/scala/kafka/api/OffsetRequest.scala | 2 +- core/src/main/scala/kafka/security/auth/Acl.scala | 62 ++++++++++++----- .../scala/kafka/security/auth/KafkaPrincipal.scala | 11 +-- .../main/scala/kafka/security/auth/Resource.scala | 12 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 79 ++++++++++++---------- core/src/test/resources/acl.json | 6 +- .../scala/unit/kafka/security/auth/AclTest.scala | 24 +++---- 7 files changed, 115 insertions(+), 81 deletions(-) diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 3d483bc..5c2084b 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -114,7 +114,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, partitionOffsetRequest) => - (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index c062509..809c6e3 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -23,14 +23,40 @@ object Acl { val wildCardPrincipal: KafkaPrincipal = new KafkaPrincipal("user", "*") val wildCardHost: String = "*" val allowAllAcl = new Acl(Set[KafkaPrincipal](wildCardPrincipal), PermissionType.ALLOW, Set[String](wildCardHost), Set[Operation](Operation.ALL)) - val PRINCIPAL_KEY = "principal" - val PERMISSION_TYPE_KEY = "permissionType" - val OPERATIONS_KEY = "operations" - val HOSTS_KEY = "hosts" - val VERSION_KEY = "version" - val CURRENT_VERSION = 1 - val ACLS_KEY = "acls" + val principalKey = "principals" + val permissionTypeKey = "permissionType" + val operationKey = "operations" + val hostsKey = "hosts" + val versionKey = "version" + val currentVersion = 1 + val aclsKey = "acls" + /** + * + * @param aclJson + * + *

+ { + "version": 1, + "acls": [ + { + "hosts": [ + "host1", + "host2" + ], + "permissionType": "DENY", + "operations": [ + "READ", + "WRITE" + ], + "principal": ["user:alice", "user:bob"] + } + ] + } + *

+ * + * @return + */ def fromJson(aclJson: String): Set[Acl] = { if(aclJson == null || aclJson.isEmpty) { return collection.immutable.Set.empty[Acl] @@ -40,13 +66,13 @@ object Acl { case Some(m) => val aclMap = m.asInstanceOf[Map[String, Any]] //the acl json version. - require(aclMap.get(VERSION_KEY).get == CURRENT_VERSION) - val aclSet: List[Map[String, Any]] = aclMap.get(ACLS_KEY).get.asInstanceOf[List[Map[String, Any]]] + require(aclMap(versionKey) == currentVersion) + val aclSet: List[Map[String, Any]] = aclMap.get(aclsKey).get.asInstanceOf[List[Map[String, Any]]] aclSet.foreach(item => { - val principals: List[KafkaPrincipal] = item(PRINCIPAL_KEY).asInstanceOf[List[String]].map(principal => KafkaPrincipal.fromString(principal)) - val permissionType: PermissionType = PermissionType.valueOf(item(PERMISSION_TYPE_KEY).asInstanceOf[String]) - val operations: List[Operation] = item(OPERATIONS_KEY).asInstanceOf[List[String]].map(operation => Operation.fromString(operation)) - val hosts: List[String] = item(HOSTS_KEY).asInstanceOf[List[String]] + val principals: List[KafkaPrincipal] = item(principalKey).asInstanceOf[List[String]].map(principal => KafkaPrincipal.fromString(principal)) + val permissionType: PermissionType = PermissionType.valueOf(item(permissionTypeKey).asInstanceOf[String]) + val operations: List[Operation] = item(operationKey).asInstanceOf[List[String]].map(operation => Operation.fromString(operation)) + val hosts: List[String] = item(hostsKey).asInstanceOf[List[String]] acls += new Acl(principals.toSet, permissionType, hosts.toSet, operations.toSet) }) case None => @@ -56,7 +82,7 @@ object Acl { def toJsonCompatibleMap(acls: Set[Acl]): Map[String,Any] = { acls match { - case aclSet: Set[Acl] => Map(Acl.VERSION_KEY -> Acl.CURRENT_VERSION, Acl.ACLS_KEY -> aclSet.map(acl => acl.toMap).toList) + case aclSet: Set[Acl] => Map(Acl.versionKey -> Acl.currentVersion, Acl.aclsKey -> aclSet.map(acl => acl.toMap).toList) case _ => null } } @@ -82,10 +108,10 @@ class Acl(val principals: Set[KafkaPrincipal],val permissionType: PermissionType */ def toMap() : Map[String, Any] = { val map: collection.mutable.HashMap[String, Any] = new collection.mutable.HashMap[String, Any]() - map.put(Acl.PRINCIPAL_KEY, principals.map(principal => principal.toString)) - map.put(Acl.PERMISSION_TYPE_KEY, permissionType.name()) - map.put(Acl.OPERATIONS_KEY, operations.map(operation => operation.name())) - map.put(Acl.HOSTS_KEY, hosts) + map.put(Acl.principalKey, principals.map(principal => principal.toString)) + map.put(Acl.permissionTypeKey, permissionType.name()) + map.put(Acl.operationKey, operations.map(operation => operation.name())) + map.put(Acl.hostsKey, hosts) map.toMap } diff --git a/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala b/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala index 21f7d44..246d940 100644 --- a/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala +++ b/core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala @@ -19,11 +19,11 @@ package kafka.security.auth import java.security.Principal object KafkaPrincipal { - val seperator: String = ":" - val userType: String = "User" + val Seperator: String = ":" + val UserType: String = "User" def fromString(str: String) : KafkaPrincipal = { - val arr: Array[String] = str.split(seperator) + val arr: Array[String] = str.split(Seperator) if(arr.length != 2) { throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str) @@ -40,12 +40,15 @@ object KafkaPrincipal { */ class KafkaPrincipal(val principalType: String,val name: String) extends Principal { + if(principalType == null || name == null) + throw new IllegalArgumentException("principalType and name can not be null") + override def getName: String = { name } override def toString: String = { - principalType + KafkaPrincipal.seperator + name + principalType + KafkaPrincipal.Seperator + name } override def equals(that: Any): Boolean = { diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index 37fd88b..df81385 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -17,15 +17,15 @@ package kafka.security.auth object Resource { - val separator: String = ":" - val clusterResourceName: String = "kafka-cluster" - val clusterResource: Resource = new Resource(ResourceType.CLUSTER,Resource.clusterResourceName) + val Separator: String = ":" + val ClusterResourceName: String = "kafka-cluster" + val ClusterResource: Resource = new Resource(ResourceType.CLUSTER,Resource.ClusterResourceName) def fromString(str: String) : Resource = { - val arr: Array[String] = str.split(separator) + val arr: Array[String] = str.split(Separator) if(arr.length != 2) { - throw new IllegalArgumentException("expected a string in format resourceType:name but got " + str + ".allowed resource types are" + ResourceType.values()) + throw new IllegalArgumentException("Expected a string in format ResourceType:Name but got " + str + ". Allowed resource types are" + ResourceType.values()) } new Resource(ResourceType.fromString(arr(0)), arr(1)) @@ -41,7 +41,7 @@ object Resource { class Resource(val resourceType: ResourceType,val name: String) { override def toString: String = { - resourceType.name() + Resource.separator + name + resourceType.name() + Resource.Separator + name } override def equals(that: Any): Boolean = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index af7630d..46aa210 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,6 +17,8 @@ package kafka.server +import javax.naming.AuthenticationException + import kafka.admin.AdminUtils import kafka.api._ import kafka.common._ @@ -99,10 +101,8 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] - if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, Map.empty, ErrorMapping.AuthorizationCode) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) - return + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + request + " is not authorized.") } try { @@ -122,10 +122,8 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] - if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, Map.empty, ErrorMapping.AuthorizationCode) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) - return + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + request + " is not authorized.") } val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) @@ -137,15 +135,8 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] - if(authorizer.isDefined) { - val unauthorizedTopicAndPartition = updateMetadataRequest.partitionStateInfos.filterKeys( - topicAndPartition => !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)).keys - //In this case the response does not allow to selectively report success/failure so if authorization fails, we fail the entire request. - if (unauthorizedTopicAndPartition.nonEmpty) { - val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId, ErrorMapping.AuthorizationCode) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) - return - } + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + request + " is not authorized.") } replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) @@ -160,10 +151,8 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] - if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.clusterResource)) { - val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.AuthorizationCode, Set.empty) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) - return + if(authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CLUSTER_ACTION, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + request + " is not authorized.") } val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) @@ -179,9 +168,10 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetCommitRequest.requestInfo.partition( - mapEntry => !authorizer.isDefined || (authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC,mapEntry._1.topic)) && - authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP,offsetCommitRequest.groupId)))) + val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetCommitRequest.requestInfo.partition { + case (topicAndPartition, _) => !authorizer.isDefined || (authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC,topicAndPartition.topic)) && + authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP,offsetCommitRequest.groupId))) + } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { @@ -274,8 +264,9 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition( - mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.WRITE, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { + case (topicAndPartition, _) => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.WRITE, new Resource(ResourceType.TOPIC,topicAndPartition.topic)) + } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { @@ -334,8 +325,9 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition( - mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition { + case (topicAndPartition, _) => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.TOPIC, topicAndPartition.topic)) + } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty)) @@ -376,8 +368,9 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition( - mapEntry => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,mapEntry._1.topic))) + val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition { + case (topicAndPartition, _) => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC, topicAndPartition.topic)) + } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil)) @@ -525,7 +518,16 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] val topics = metadataRequest.topics.toSet - val (authorizedTopics, unauthorizedTopics) = topics.partition(topic => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,topic))) + var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,topic))) + + val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol) + if(config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) { + val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet + if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CREATE, Resource.ClusterResource)) { + authorizedTopics --= nonExistentTopics + unauthorizedTopics ++= nonExistentTopics + } + } val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode)) @@ -544,7 +546,8 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition( - topicAndPartition => !authorizer.isDefined || authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC,topicAndPartition.topic))) + topicAndPartition => !authorizer.isDefined || (authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC, topicAndPartition.topic)) + && authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP, offsetFetchRequest.groupId)))) val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode) val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap @@ -602,11 +605,13 @@ class KafkaApis(val requestChannel: RequestChannel, val partition = offsetManager.partitionFor(consumerMetadataRequest.group) - //TODO: this can in turn create the topic, so we should check the create permissions if the config is enabled and topic is non existent. - if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.DESCRIBE, new Resource(ResourceType.TOPIC, OffsetManager.OffsetsTopicName))) { - val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(errorResponse))) - return + if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.READ, new Resource(ResourceType.CONSUMER_GROUP, consumerMetadataRequest.group))) { + throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized.") + } + + if(metadataCache.getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).isEmpty && + authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CREATE, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized.") } // get metadata (and create the topic if necessary) diff --git a/core/src/test/resources/acl.json b/core/src/test/resources/acl.json index 97710b3..ae7fbf8 100644 --- a/core/src/test/resources/acl.json +++ b/core/src/test/resources/acl.json @@ -11,7 +11,7 @@ "READ", "WRITE" ], - "principal": ["user:alice", "user:bob"] + "principals": ["user:alice", "user:bob"] }, { "hosts": [ @@ -22,7 +22,7 @@ "READ", "WRITE" ], - "principal": ["user:bob"] + "principals": ["user:bob"] }, { "hosts": [ @@ -33,7 +33,7 @@ "operations": [ "read" ], - "principal": ["user:bob"] + "principals": ["user:bob"] } ] } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index a48fbce..9bb504a 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -24,9 +24,9 @@ import org.scalatest.junit.JUnit3Suite class AclTest extends JUnit3Suite { def testAclJsonConversion(): Unit = { - val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice"), new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl2: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("*"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ)) + val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice"), new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl2: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.ALLOW, Set[String]("*"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.DENY, Set[String]("host1","host2"), Set[Operation](Operation.READ)) val acls: Set[Acl] = Set[Acl](acl1, acl2, acl3) val jsonAcls: String = Json.encode(Acl.toJsonCompatibleMap(acls)) @@ -41,30 +41,30 @@ class AclTest extends JUnit3Suite { def testEqualsAndHashCode(): Unit = { //check equals is not sensitive to case or order for principal,hosts or operations. - val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob"), new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl2: Acl = new Acl(Set(new KafkaPrincipal("USER", "ALICE"), new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("HOST2", "HOST1"), Set[Operation](Operation.WRITE, Operation.READ)) + val acl1: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "bob"), new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl2: Acl = new Acl(Set(new KafkaPrincipal("USER", "ALICE"), new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.ALLOW, Set[String]("HOST2", "HOST1"), Set[Operation](Operation.WRITE, Operation.READ)) Assert.assertEquals(acl1, acl2) Assert.assertEquals(acl1.hashCode(), acl2.hashCode()) //if user does not match returns false - val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl3: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) val acl4: Acl = new Acl(Set(new KafkaPrincipal("USER", "Bob")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) Assert.assertFalse(acl3.equals(acl4)) //if permission does not match return false - val acl5: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.DENY, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl6: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl5: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.DENY, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl6: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) Assert.assertFalse(acl5.equals(acl6)) //if hosts do not match return false - val acl7: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("host10", "HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl8: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl7: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.ALLOW, Set[String]("host10", "HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl8: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "alice")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.WRITE)) Assert.assertFalse(acl7.equals(acl8)) //if Opoerations do not match return false - val acl9: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) - val acl10: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.userType, "bob")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.DESCRIBE)) + val acl9: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.ALLOW, Set[String]("host1", "host2"), Set[Operation](Operation.READ, Operation.WRITE)) + val acl10: Acl = new Acl(Set(new KafkaPrincipal(KafkaPrincipal.UserType, "bob")), PermissionType.ALLOW, Set[String]("HOST1","HOST2"), Set[Operation](Operation.READ, Operation.DESCRIBE)) Assert.assertFalse(acl9.equals(acl10)) } } -- 2.1.3.36.g8e36a6d From c7d01517732288fd970f0aba28758dd797b6c48f Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 4 Jun 2015 16:06:33 -0700 Subject: [PATCH 3/3] Adding CREATE check for offset topic only if the topic does not exist already. --- core/src/main/scala/kafka/server/KafkaApis.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 46aa210..700bb70 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -609,9 +609,11 @@ class KafkaApis(val requestChannel: RequestChannel, throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized.") } - if(metadataCache.getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).isEmpty && - authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CREATE, Resource.ClusterResource)) { - throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized.") + val topicResponses = metadataCache.getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol) + if(topicResponses.isEmpty) { + if (authorizer.isDefined && !authorizer.get.authorize(request.session, Operation.CREATE, Resource.ClusterResource)) { + throw new AuthorizationException("Request " + consumerMetadataRequest + " is not authorized.") + } } // get metadata (and create the topic if necessary) -- 2.1.3.36.g8e36a6d