Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-18796

camel-kafka: kafka consumer stops in case of an authentication issue

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.18.0, 3.19.0
    • 3.18.5, 3.20.0
    • camel-kafka
    • None
    • Unknown

    Description

      I'm running in a strange behavior of the camle-kafka component in case of a glitch/temporary authentication issue. Assuming we have the following code:

      //usr/bin/env jbang "$0" "$@" ; exit $?
      //
      //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom
      //DEPS org.apache.camel.quarkus:camel-quarkus-kafka
      //DEPS org.apache.camel.quarkus:camel-quarkus-log
      //DEPS org.apache.camel.quarkus:camel-quarkus-direct
      //
      //JAVAC_OPTIONS -parameters
      //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
      //
      
      import org.apache.camel.ExtendedCamelContext;
      import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
      
      public class ck extends EndpointRouteBuilder {
      
          @Override
          public void configure() throws Exception {
              getCamelContext().adapt(ExtendedCamelContext.class)
                  .setErrorHandlerFactory(
                      deadLetterChannel("direct:dlq")
                  );
      
              var kafka = kafka("demo")
                  .brokers("{{test.kafka.broker}}")
                  .autoOffsetReset("earliest")
                  .securityProtocol("SASL_SSL")  
                  .saslMechanism("PLAIN")
                  .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule required username='{{test.kafka.username}}' password='{{test.kafka.password}}';");      
      
              from("direct:dlq")
                  .to("log:dlq?showAll=true&multiline=true");
      
              from(kafka)
                  .to("log:kafka?showAll=true&multiline=true");
          }
      }
      

      What this route is doing is:
      1. set-up a global error handler (send to a DLQ)
      2. poll data from a kafka topic

      If for some reason there is a glitch in the authentication machinery, then the KafkaConsumer thread is terminated and no more poll/reconnection attempt are made.

      2022-12-05 21:52:48,728 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
      2022-12-05 21:52:53,729 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
      2022-12-05 21:52:58,730 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
      2022-12-05 21:53:03,731 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
      2022-12-05 21:53:08,732 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process
      2022-12-05 21:53:09,598 INFO  [org.apa.kaf.com.net.Selector] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 (channelId=2147483647) (Authentication failed: credentials for user could not be verified)
      2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443) failed authentication due to: Authentication failed: credentials for user could not be verified
      2022-12-05 21:53:09,605 WARN  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exception org.apache.kafka.common.errors.SaslAuthenticationException caught by thread demo-Thread 0 while polling topic demo from kafka: Authentication failed: credentials for user could not be verified: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
      
      2022-12-05 21:53:09,609 WARN  [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Deferring processing to the exception handler based on polling exception strategy
      2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: 386B9AF6D607152-0000000000000000 on ExchangeId: 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
      2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq Exchange[386B9AF6D607152-0000000000000000]
      2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> log://dlq?multiline=true&showAll=true Exchange[386B9AF6D607152-0000000000000000]
      2022-12-05 21:53:09,629 INFO  [dlq] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exchange[
        Id: 386B9AF6D607152-0000000000000000
        ExchangePattern: InOnly
        Properties: {CamelErrorHandlerBridge=true, CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified, CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], CamelToEndpoint=log://dlq?multiline=true&showAll=true}
        Headers: {}
        BodyType: null
        Body: [Body is null]
        CaughtExceptionType: org.apache.kafka.common.errors.SaslAuthenticationException  CaughtExceptionMessage: Authentication failed: credentials for user could not be verified  StackTrace: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified
      
      ]
      2022-12-05 21:53:09,635 INFO  [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 to continue polling next message from topic demo on partition 0
      2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0
      2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo
      2022-12-05 21:53:09,643 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread demo-Thread 0 receiving from topic demo
      

      However according to the documentation, if the pollOnError is set to ERROR_HANDLER as in this case (it is the default), the strategy should use Camel’s error handler to process the exception, and afterwards continue to poll next message but this does not seems to be the case.

      This seems to be somehow related to:

      Attachments

        Activity

          People

            orpiske Otavio Rodolfo Piske
            lb Luca Burgazzoli
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: