Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.18.0, 3.19.0
-
None
-
Unknown
Description
I'm running in a strange behavior of the camle-kafka component in case of a glitch/temporary authentication issue. Assuming we have the following code:
//usr/bin/env jbang "$0" "$@" ; exit $? // //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom //DEPS org.apache.camel.quarkus:camel-quarkus-kafka //DEPS org.apache.camel.quarkus:camel-quarkus-log //DEPS org.apache.camel.quarkus:camel-quarkus-direct // //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager // import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.endpoint.EndpointRouteBuilder; public class ck extends EndpointRouteBuilder { @Override public void configure() throws Exception { getCamelContext().adapt(ExtendedCamelContext.class) .setErrorHandlerFactory( deadLetterChannel("direct:dlq") ); var kafka = kafka("demo") .brokers("{{test.kafka.broker}}") .autoOffsetReset("earliest") .securityProtocol("SASL_SSL") .saslMechanism("PLAIN") .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule required username='{{test.kafka.username}}' password='{{test.kafka.password}}';"); from("direct:dlq") .to("log:dlq?showAll=true&multiline=true"); from(kafka) .to("log:kafka?showAll=true&multiline=true"); } }
What this route is doing is:
1. set-up a global error handler (send to a DLQ)
2. poll data from a kafka topic
If for some reason there is a glitch in the authentication machinery, then the KafkaConsumer thread is terminated and no more poll/reconnection attempt are made.
2022-12-05 21:52:48,728 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process 2022-12-05 21:52:53,729 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process 2022-12-05 21:52:58,730 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process 2022-12-05 21:53:03,731 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process 2022-12-05 21:53:08,732 DEBUG [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted on 0 records to process 2022-12-05 21:53:09,598 INFO [org.apa.kaf.com.net.Selector] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 (channelId=2147483647) (Authentication failed: credentials for user could not be verified) 2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443) failed authentication due to: Authentication failed: credentials for user could not be verified 2022-12-05 21:53:09,605 WARN [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exception org.apache.kafka.common.errors.SaslAuthenticationException caught by thread demo-Thread 0 while polling topic demo from kafka: Authentication failed: credentials for user could not be verified: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified 2022-12-05 21:53:09,609 WARN [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Deferring processing to the exception handler based on polling exception strategy 2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: 386B9AF6D607152-0000000000000000 on ExchangeId: 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq Exchange[386B9AF6D607152-0000000000000000] 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) >>>> log://dlq?multiline=true&showAll=true Exchange[386B9AF6D607152-0000000000000000] 2022-12-05 21:53:09,629 INFO [dlq] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Exchange[ Id: 386B9AF6D607152-0000000000000000 ExchangePattern: InOnly Properties: {CamelErrorHandlerBridge=true, CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified, CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], CamelToEndpoint=log://dlq?multiline=true&showAll=true} Headers: {} BodyType: null Body: [Body is null] CaughtExceptionType: org.apache.kafka.common.errors.SaslAuthenticationException CaughtExceptionMessage: Authentication failed: credentials for user could not be verified StackTrace: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: credentials for user could not be verified ] 2022-12-05 21:53:09,635 INFO [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 to continue polling next message from topic demo on partition 0 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo 2022-12-05 21:53:09,643 INFO [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread demo-Thread 0 receiving from topic demo
However according to the documentation, if the pollOnError is set to ERROR_HANDLER as in this case (it is the default), the strategy should use Camel’s error handler to process the exception, and afterwards continue to poll next message but this does not seems to be the case.
This seems to be somehow related to: