Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14102

(SASL/OAUTHBEARER) multiple applications in one JVM process, only the first started app can consume messages

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Blocker
    • Resolution: Unresolved
    • 3.0.1
    • None
    • clients
    • None

    Description

      We have 2 web applications (A and B) will consume messages from the same Kafka Server,  so they have the same configurations:

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
      jaas.enabled=true

       

      A and B deployed together in one Tomcat server (means they are in JVM process), startup  sequential is A -> B,  then we find B cannot consume the message with following exception:

      [2022-07-22 02:52:45,184] [ INFO] 6 [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] o.a.k.c.n.SaslChannelBuilder             -  - [Consumer clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
      Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
      at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182) ~[kafka-clients-3.0.1.jar:?]
      at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219) ~[kafka-clients-3.0.1.jar:?]
      ... suppressed 2 lines
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.common.network.Selector.connect(Selector.java:256) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar:?]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522) ~[spring-kafka-2.8.6.jar:2.8.6]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512) ~[spring-kafka-2.8.6.jar:2.8.6]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340) ~[spring-kafka-2.8.6.jar:2.8.6]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]

       

      I have also debugged into the issue and found the key error is: 

      Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler 

      Application A start up first, it register OAuthBearerSaslClientProvider in the static block of OAuthBearerLoginModule class, So the OAuthBearerSaslClientProvider is loaded and initialized by ParallelWebappClassLoader for Application A.

      static {
          OAuthBearerSaslClientProvider.initialize(); // not part of public API
          OAuthBearerSaslServerProvider.initialize(); // not part of public API
      } 

      Application B then start up, it try to register OAuthBearerSaslClientProvider again with same code, but it won't have any effect because the provider was already registered by Application A.

      When application B try to create the SaslClient, AuthenticateCallbackHandler class was loaded from the one for Application A , while OAuthBearerSaslClientCallbackHandler class was loaded from the one for Application B.

       

       

       

      Attachments

        1. image-2022-10-12-08-39-01-004.png
          0.3 kB
          Shuo Chen
        2. image-2022-10-12-08-43-57-597.png
          0.3 kB
          Shuo Chen

        Activity

          People

            Unassigned Unassigned
            shuochen Shuo Chen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: