Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-13140

camel-kafka - consumer does not respect auto.commit.interval.ms with AutoCommitEnabled=true

    XMLWordPrintableJSON

    Details

    • Estimated Complexity:
      Unknown

      Description

      This is probably a side effect of CAMEL-12454: when auto commit enabled is enabled kafka consumer commits offsets as soon as an exchange is complete with no regard to the auto.commit.interval.ms setting, which may cause additional non-balanced load on the kafka cluster (see attached screenshot depicting kafka brokers cpu load right after upgrade from 2.20.3 to 2.23.0).

      Here is an example with debugging turned on for org.apache.camel.component.kafka.KafkaConsumer, as you can see Kafka consumer commits every 10-500 ms instead of once per 5 seconds:

      2019-01-28 10:46:55.025  INFO 3324 --- [ontext_Worker-3] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
      	auto.commit.interval.ms = 5000
      	auto.offset.reset = latest
      	bootstrap.servers = [192.168.56.10:9093]
      	check.crcs = true
      	client.id = 
      	connections.max.idle.ms = 540000
      	enable.auto.commit = true
      	exclude.internal.topics = true
      	fetch.max.bytes = 52428800
      	fetch.max.wait.ms = 500
      	fetch.min.bytes = 1
      	group.id = service_new
      	heartbeat.interval.ms = 3000
      	interceptor.classes = null
      	internal.leave.group.on.close = true
      	isolation.level = read_uncommitted
      	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
      	max.partition.fetch.bytes = 1048576
      	max.poll.interval.ms = 300000
      	max.poll.records = 1000
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
      	receive.buffer.bytes = 65536
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 40000
      	retry.backoff.ms = 100
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.mechanism = GSSAPI
      	security.protocol = SSL
      	send.buffer.bytes = 131072
      	session.timeout.ms = 30000
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2]
      	ssl.endpoint.identification.algorithm = null
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = /usr/files/server.jks
      	ssl.keystore.password = [hidden]
      	ssl.keystore.type = JCEKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = /usr/files/truststore.jks
      	ssl.truststore.password = [hidden]
      	ssl.truststore.type = JCEKS
      	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
      
      2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
      2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
      2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: kafka:topic1,topic2,topic3?brokers=192.168.56.10:9093
      2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
      2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
      2019-01-28 10:46:55.313  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Discovered group coordinator 192.168.56.10:9093 (id: 2147483646 rack: null)
      2019-01-28 10:46:55.315  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Revoking previously assigned partitions []
      2019-01-28 10:46:55.316  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] (Re-)joining group
      2019-01-28 10:46:58.469  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Successfully joined group with generation 3
      2019-01-28 10:46:58.470  INFO 3324 --- [uponassignment]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=service_new] Setting newly assigned partitions [topic3-1, topic3-0, topic2-1, topic1-0, topic2-0, topic1-1]
      2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
      2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3221
      2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
      2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3222
      2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
      2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3325
      2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
      2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3223
      2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
      2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3326
      2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
      2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3224
      2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
      2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3327
      2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
      2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3225
      2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
      2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3328
      2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
      2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3226
      2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
      2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3329
      2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
      2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3330
      2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
      2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3227
      2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
      2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 3228
      

      When downgraded to camel-kafka 2.20.3 commits are done correctly (once every auto.commit.interval.ms) and there are no messages from o.a.camel.component.kafka.KafkaConsumer in the debug log.
      Running with debugger showed that this code actually never gets executed in 2.20.3: https://github.com/apache/camel/blob/2.20.x/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L382 and we seem to rely on the auto-commit feature of the Kafka client itself, not the camel-kafka wrapper.

        Attachments

        1. Kafka_Brokers_CPU.png
          100 kB
          Aleksei Vasilevskii

          Issue Links

            Activity

              People

              • Assignee:
                davsclaus Claus Ibsen
                Reporter:
                avasilevskii Aleksei Vasilevskii
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: