Details
Description
This is probably a side effect of CAMEL-12454: when auto commit enabled is enabled kafka consumer commits offsets as soon as an exchange is complete with no regard to the auto.commit.interval.ms setting, which may cause additional non-balanced load on the kafka cluster (see attached screenshot depicting kafka brokers cpu load right after upgrade from 2.20.3 to 2.23.0).
Here is an example with debugging turned on for org.apache.camel.component.kafka.KafkaConsumer, as you can see Kafka consumer commits every 10-500 ms instead of once per 5 seconds:
2019-01-28 10:46:55.025 INFO 3324 --- [ontext_Worker-3] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [192.168.56.10:9093] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = service_new heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = /usr/files/server.jks ssl.keystore.password = [hidden] ssl.keystore.type = JCEKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /usr/files/truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JCEKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-01-28 10:46:55.160 INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.2 2019-01-28 10:46:55.160 INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825 2019-01-28 10:46:55.160 INFO 3324 --- [ontext_Worker-3] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: kafka:topic1,topic2,topic3?brokers=192.168.56.10:9093 2019-01-28 10:46:55.161 INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3 2019-01-28 10:46:55.161 INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3 2019-01-28 10:46:55.313 INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=service_new] Discovered group coordinator 192.168.56.10:9093 (id: 2147483646 rack: null) 2019-01-28 10:46:55.315 INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=service_new] Revoking previously assigned partitions [] 2019-01-28 10:46:55.316 INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=service_new] (Re-)joining group 2019-01-28 10:46:58.469 INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=service_new] Successfully joined group with generation 3 2019-01-28 10:46:58.470 INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=service_new] Setting newly assigned partitions [topic3-1, topic3-0, topic2-1, topic1-0, topic2-0, topic1-1] 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
When downgraded to camel-kafka 2.20.3 commits are done correctly (once every auto.commit.interval.ms) and there are no messages from o.a.camel.component.kafka.KafkaConsumer in the debug log.
Running with debugger showed that this code actually never gets executed in 2.20.3: https://github.com/apache/camel/blob/2.20.x/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L382 and we seem to rely on the auto-commit feature of the Kafka client itself, not the camel-kafka wrapper.
Attachments
Attachments
Issue Links
- is related to
-
CAMEL-10013 Write automatic test validating syntax of every endpoint of the catalog
- Resolved