Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6762

log-cleaner thread terminates due to java.lang.IllegalStateException

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.0.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None
    • Environment:
      os: GNU/Linux
      arch: x86_64
      Kernel: 4.9.77
      jvm: OpenJDK 1.8.0

      Description

      We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order to fix KAFKA-6683, but until then we can only confirm that it happens in 1.0.0.

      log-cleaner thread crashes after a while with the following error:

      [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-31. (kafka.log.LogCleaner)
      [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for __consumer_offsets-31... (kafka.log.LogCleaner)
      [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log __consumer_offsets-31 complete. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to Sat Feb 24 11:04:21 GMT 2018
      )... (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, discarding deletes. (kafka.log.LogClea
      ner)
      [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
      java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 1000012.
              at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
              at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
              at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
              at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
              at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
              at scala.collection.immutable.List.foreach(List.scala:389)
              at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
              at kafka.log.Cleaner.clean(LogCleaner.scala:372)
              at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
              at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
      [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,773] INFO The cleaning for partition __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,773] INFO Compaction for partition __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,774] INFO The cleaning for partition __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed (kafka.log.LogCleaner)
      

      What we know so far is:

      • We are unable to reproduce it yet in a consistent manner.
      • It only happens in the PRO cluster and not in the PRE cluster for the same customer (which message payloads are very similar)
      • Checking our Kafka logs, it only happened on the internal topics __consumer_offsets-*
      • When we restart the broker process the log-cleaner starts working again but it can take between 3 minutes and some hours to die again.
      • We workaround it by temporary increasing the message.max.bytes and replica.fetch.max.bytes values to 10485760 (10MB) from default 1000012 (~1MB).
        • Before message.max.bytes = 10MB, we tried to match message.max.size with the value of replica.fetch.max.size (1048576), but log-cleaned died with the same error but different limit.
        • This allowed the log-cleaner not to die and compact enough data as for disk space to go from ~600GB to ~100GB.
        • Without this limit change, the log-cleaner dies after a while and the used disk space stay at ~450GB and starts growing again due to cluster activity.

      Our server.properties content is as follows, as printed ins server.log at broker startup.

      broker.id=11
      delete.topic.enable=true
      advertised.listeners=PLAINTEXT://broker-ip:9092
      num.network.threads=3
      num.io.threads=8
      socket.send.buffer.bytes=102400
      socket.receive.buffer.bytes=102400
      socket.request.max.bytes=104857600
      log.dirs=/var/lib/kafka
      num.partitions=12
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=3
      transaction.state.log.replication.factor=3
      transaction.state.log.min.isr=2
      log.retention.hours=168
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      zookeeper.connect=x.x.x.x:2181/kafka/cluster01
      zookeeper.connection.timeout.ms=6000
      group.initial.rebalance.delay.ms=3000
      auto.create.topics.enable=false
      inter.broker.protocol.version=0.11.0
      log.message.format.version=0.11.0
      broker.rack=eu-west-1b
      default.replication.factor=3
      offsets.retention.minutes=10080
      

      Our kafka configuration values, as printed in server.log when starting is the following.

      [2018-03-28 10:40:29,652] INFO KafkaConfig values:
              advertised.host.name = null
              advertised.listeners = PLAINTEXT://broker_ip:9092
              advertised.port = null
              alter.config.policy.class.name = null
              authorizer.class.name =
              auto.create.topics.enable = false
              auto.leader.rebalance.enable = true
              background.threads = 10
              broker.id = 11
              broker.id.generation.enable = true
              broker.rack = eu-west-1b
              compression.type = producer
              connections.max.idle.ms = 600000
              controlled.shutdown.enable = true
              controlled.shutdown.max.retries = 3
              controlled.shutdown.retry.backoff.ms = 5000
              controller.socket.timeout.ms = 30000
              create.topic.policy.class.name = null
              default.replication.factor = 3
              delete.records.purgatory.purge.interval.requests = 1
              delete.topic.enable = true
              fetch.purgatory.purge.interval.requests = 1000
              group.initial.rebalance.delay.ms = 3000
              group.max.session.timeout.ms = 300000
              group.min.session.timeout.ms = 6000
              host.name =
              inter.broker.listener.name = null
              inter.broker.protocol.version = 0.11.0
              leader.imbalance.check.interval.seconds = 300
              leader.imbalance.per.broker.percentage = 10
              listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
              listeners = null
              log.cleaner.backoff.ms = 15000
              log.cleaner.dedupe.buffer.size = 134217728
              log.cleaner.delete.retention.ms = 86400000
              log.cleaner.enable = true
              log.cleaner.io.buffer.load.factor = 0.9
              log.cleaner.io.buffer.size = 524288
              log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
              log.cleaner.min.cleanable.ratio = 0.5
              log.cleaner.min.compaction.lag.ms = 0
              log.cleaner.threads = 1
              log.cleanup.policy = [delete]
              log.dir = /tmp/kafka-logs
              log.dirs = /var/lib/kafka
              log.flush.interval.messages = 9223372036854775807
              log.flush.interval.ms = null
              log.flush.offset.checkpoint.interval.ms = 60000
              log.flush.scheduler.interval.ms = 9223372036854775807
              log.flush.start.offset.checkpoint.interval.ms = 60000
              log.index.interval.bytes = 4096
              log.index.size.max.bytes = 10485760
              log.message.format.version = 0.11.0
              log.message.timestamp.difference.max.ms = 9223372036854775807
              log.message.timestamp.type = CreateTime
              log.preallocate = false
              log.retention.bytes = -1
              log.retention.check.interval.ms = 300000
              log.retention.hours = 168
              log.retention.minutes = null
              log.retention.ms = null
              log.roll.hours = 168
              log.roll.jitter.hours = 0
              log.roll.jitter.ms = null
              log.roll.ms = null
              log.segment.bytes = 1073741824
              log.segment.delete.delay.ms = 60000
              max.connections.per.ip = 2147483647
              max.connections.per.ip.overrides =
              message.max.bytes = 1000012
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              min.insync.replicas = 1
              num.io.threads = 8
              num.network.threads = 3
              num.partitions = 12
              num.recovery.threads.per.data.dir = 1
              num.replica.fetchers = 1
              offset.metadata.max.bytes = 4096
              offsets.commit.required.acks = -1
              offsets.commit.timeout.ms = 5000
              offsets.load.buffer.size = 5242880
              offsets.retention.check.interval.ms = 600000
              offsets.retention.minutes = 10080
              offsets.topic.compression.codec = 0
              offsets.topic.num.partitions = 50
              offsets.topic.replication.factor = 3
              offsets.topic.segment.bytes = 104857600
              port = 9092
              principal.builder.class = null
              producer.purgatory.purge.interval.requests = 1000
              queued.max.request.bytes = -1
              queued.max.requests = 500
              quota.consumer.default = 9223372036854775807
              quota.producer.default = 9223372036854775807
              quota.window.num = 11
              quota.window.size.seconds = 1
              replica.fetch.backoff.ms = 1000
              replica.fetch.max.bytes = 1048576
              replica.fetch.min.bytes = 1
              replica.fetch.response.max.bytes = 10485760
              replica.fetch.wait.max.ms = 500
              replica.high.watermark.checkpoint.interval.ms = 5000
              replica.lag.time.max.ms = 10000
              replica.socket.receive.buffer.bytes = 65536
              replica.socket.timeout.ms = 30000
              replication.quota.window.num = 11
              replication.quota.window.size.seconds = 1
              request.timeout.ms = 30000
              reserved.broker.max.id = 1000
              sasl.enabled.mechanisms = [GSSAPI]
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.principal.to.local.rules = [DEFAULT]
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.mechanism.inter.broker.protocol = GSSAPI
              security.inter.broker.protocol = PLAINTEXT
              socket.receive.buffer.bytes = 102400
              socket.request.max.bytes = 104857600
              socket.send.buffer.bytes = 102400
              ssl.cipher.suites = null
              ssl.client.auth = none
              ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
              ssl.endpoint.identification.algorithm = null
              ssl.key.password = null
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.location = null
              ssl.keystore.password = null
              ssl.keystore.type = JKS
              ssl.protocol = TLS
              ssl.provider = null
              ssl.secure.random.implementation = null
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.location = null
              ssl.truststore.password = null
              ssl.truststore.type = JKS
              transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
              transaction.max.timeout.ms = 900000
              transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
              transaction.state.log.load.buffer.size = 5242880
              transaction.state.log.min.isr = 2
              transaction.state.log.num.partitions = 50
              transaction.state.log.replication.factor = 3
              transaction.state.log.segment.bytes = 104857600
              transactional.id.expiration.ms = 604800000
              unclean.leader.election.enable = false
              zookeeper.connect = (zookeeper connection string here)
              zookeeper.connection.timeout.ms = 6000
              zookeeper.session.timeout.ms = 6000
              zookeeper.set.acl = false
              zookeeper.sync.time.ms = 2000
       (kafka.server.KafkaConfig)
      [2018-03-28 10:40:29,745] INFO starting (kafka.server.KafkaServer)
      

      The clients (sprint-kafka 2.1.4 that provides kafka-clients 1.0.x) have the following configuration which is:

      • max.partition.fetch.bytes = 1048576

      While looks like increasing the message size might do the trick, we are concerned that this happens again as soon as a customers start publishing messages closer to the message.max.bytes limit. We decided to open this bug report because we think something might be wrong if message.max.bytes is 1MB and the log-cleaner throws an exception while compacting messages on __consumer_offsets because it find messages that are bigger than that.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                ricbartm Ricardo Bartolome
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: