Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.0, 2.5.0
-
None
Description
A NullPointerException occurs in RangerKafkaAuthorizer during the execution of the callRangerPlugin method, specifically when calling auditHandler.flushAudit() because auditHandler is null. The issue arises from the constructor of RangerKafkaAuthorizer being called twice, while the configure method is called only once. Since auditHandler is initialized in the configure method, the second constructor call without initialization via the configure() method causes auditHandler to remain uninitialized, leading to the following exception.
[2024-09-25 12:40:30,785] DEBUG <== RangerPluginClassLoader.deactivate() (org.apache.ranger.plugin.classloader.RangerPluginClassLoader) [2024-09-25 12:40:30,785] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=console-consumer, correlationId=0, headerVersio n=2) – FindCoordinatorRequestData(key='', keyType=0, coordinatorKeys=[console-consumer-34426]) with context RequestContext(header=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clie ntId=console-consumer, correlationId=0, headerVersion=2), connectionId='10.0.0.59:9094-10.0.0.59:56938-0', clientAddress=/10.0.0.59, principal=User:kafka, listenerName=ListenerName(KERBERO S), securityProtocol=SASL_PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=7.7.0-ccs), fromPrivilegedListener=false, principalSerde=Optional[o rg.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@104e7feb]) (kafka.server.KafkaApis) java.lang.NullPointerException at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.callRangerPlugin(RangerKafkaAuthorizer.java:303) at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.wrappedAuthorization(RangerKafkaAuthorizer.java:288) at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:246) at org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:136) at kafka.server.AuthHelper.$anonfun$authorize$1(AuthHelper.scala:53) at kafka.server.AuthHelper.$anonfun$authorize$1$adapted(AuthHelper.scala:50) at scala.Option.forall(Option.scala:420) at kafka.server.AuthHelper.authorize(AuthHelper.scala:50) at kafka.server.KafkaApis.getCoordinator(KafkaApis.scala:1647) at kafka.server.KafkaApis.$anonfun$handleFindCoordinatorRequestV4AndAbove$1(KafkaApis.scala:1601) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:138) at kafka.server.KafkaApis.handleFindCoordinatorRequestV4AndAbove(KafkaApis.scala:1600) at kafka.server.KafkaApis.handleFindCoordinatorRequest(KafkaApis.scala:1593) at kafka.server.KafkaApis.handle(KafkaApis.scala:194) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:160) at java.base/java.lang.Thread.run(Thread.java:829)
The problematic method
auditHandler.flushAudit(); RangerKafkaAuthorizer.java:303
private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) { try { return rangerPlugin.isAccessAllowed(rangerRequests); } catch (Throwable t) { logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t); return null; } finally { auditHandler.flushAudit(); } }
Console consumer also reports the following error:
[2024-09-25 12:40:30,784] WARN [Consumer clientId=console-consumer, groupId=console-consumer-34426] Error while fetching metadata with correlation id 2 : \{ranger_audits=UNKNOWN_SERVER_ERROR} (org.apache.kafka.clients.NetworkClient)
[2024-09-25 12:40:30,791] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.
[2024-09-25 12:40:30,870] WARN [Principal=kafka/governance-test.lan.net]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
Processed a total of 0 messages
Quick Workaround for Testing Purposes:
To ensure single initialization across multiple constructor calls, make auditHandler static and volatile:
private static volatile RangerKafkaAuditHandler auditHandler = null;
Original source:
private static volatile RangerBasePlugin rangerPlugin = null; RangerKafkaAuditHandler auditHandler = null;