Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6762

log-cleaner thread terminates due to java.lang.IllegalStateException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.0.0
    • None
    • core
    • None
    • os: GNU/Linux
      arch: x86_64
      Kernel: 4.9.77
      jvm: OpenJDK 1.8.0

    Description

      We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order to fix KAFKA-6683, but until then we can only confirm that it happens in 1.0.0.

      log-cleaner thread crashes after a while with the following error:

      [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-31. (kafka.log.LogCleaner)
      [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for __consumer_offsets-31... (kafka.log.LogCleaner)
      [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log __consumer_offsets-31 complete. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to Sat Feb 24 11:04:21 GMT 2018
      )... (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, discarding deletes. (kafka.log.LogClea
      ner)
      [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)
      [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
      java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 1000012.
              at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
              at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
              at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
              at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
              at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
              at scala.collection.immutable.List.foreach(List.scala:389)
              at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
              at kafka.log.Cleaner.clean(LogCleaner.scala:372)
              at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
              at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
      [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,773] INFO The cleaning for partition __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,773] INFO Compaction for partition __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
      [2018-04-04 14:25:12,774] INFO The cleaning for partition __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down (kafka.log.LogCleaner)
      [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed (kafka.log.LogCleaner)
      

      What we know so far is:

      • We are unable to reproduce it yet in a consistent manner.
      • It only happens in the PRO cluster and not in the PRE cluster for the same customer (which message payloads are very similar)
      • Checking our Kafka logs, it only happened on the internal topics __consumer_offsets-*
      • When we restart the broker process the log-cleaner starts working again but it can take between 3 minutes and some hours to die again.
      • We workaround it by temporary increasing the message.max.bytes and replica.fetch.max.bytes values to 10485760 (10MB) from default 1000012 (~1MB).
        • Before message.max.bytes = 10MB, we tried to match message.max.size with the value of replica.fetch.max.size (1048576), but log-cleaned died with the same error but different limit.
        • This allowed the log-cleaner not to die and compact enough data as for disk space to go from ~600GB to ~100GB.
        • Without this limit change, the log-cleaner dies after a while and the used disk space stay at ~450GB and starts growing again due to cluster activity.

      Our server.properties content is as follows, as printed ins server.log at broker startup.

      broker.id=11
      delete.topic.enable=true
      advertised.listeners=PLAINTEXT://broker-ip:9092
      num.network.threads=3
      num.io.threads=8
      socket.send.buffer.bytes=102400
      socket.receive.buffer.bytes=102400
      socket.request.max.bytes=104857600
      log.dirs=/var/lib/kafka
      num.partitions=12
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=3
      transaction.state.log.replication.factor=3
      transaction.state.log.min.isr=2
      log.retention.hours=168
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      zookeeper.connect=x.x.x.x:2181/kafka/cluster01
      zookeeper.connection.timeout.ms=6000
      group.initial.rebalance.delay.ms=3000
      auto.create.topics.enable=false
      inter.broker.protocol.version=0.11.0
      log.message.format.version=0.11.0
      broker.rack=eu-west-1b
      default.replication.factor=3
      offsets.retention.minutes=10080
      

      Our kafka configuration values, as printed in server.log when starting is the following.

      [2018-03-28 10:40:29,652] INFO KafkaConfig values:
              advertised.host.name = null
              advertised.listeners = PLAINTEXT://broker_ip:9092
              advertised.port = null
              alter.config.policy.class.name = null
              authorizer.class.name =
              auto.create.topics.enable = false
              auto.leader.rebalance.enable = true
              background.threads = 10
              broker.id = 11
              broker.id.generation.enable = true
              broker.rack = eu-west-1b
              compression.type = producer
              connections.max.idle.ms = 600000
              controlled.shutdown.enable = true
              controlled.shutdown.max.retries = 3
              controlled.shutdown.retry.backoff.ms = 5000
              controller.socket.timeout.ms = 30000
              create.topic.policy.class.name = null
              default.replication.factor = 3
              delete.records.purgatory.purge.interval.requests = 1
              delete.topic.enable = true
              fetch.purgatory.purge.interval.requests = 1000
              group.initial.rebalance.delay.ms = 3000
              group.max.session.timeout.ms = 300000
              group.min.session.timeout.ms = 6000
              host.name =
              inter.broker.listener.name = null
              inter.broker.protocol.version = 0.11.0
              leader.imbalance.check.interval.seconds = 300
              leader.imbalance.per.broker.percentage = 10
              listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
              listeners = null
              log.cleaner.backoff.ms = 15000
              log.cleaner.dedupe.buffer.size = 134217728
              log.cleaner.delete.retention.ms = 86400000
              log.cleaner.enable = true
              log.cleaner.io.buffer.load.factor = 0.9
              log.cleaner.io.buffer.size = 524288
              log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
              log.cleaner.min.cleanable.ratio = 0.5
              log.cleaner.min.compaction.lag.ms = 0
              log.cleaner.threads = 1
              log.cleanup.policy = [delete]
              log.dir = /tmp/kafka-logs
              log.dirs = /var/lib/kafka
              log.flush.interval.messages = 9223372036854775807
              log.flush.interval.ms = null
              log.flush.offset.checkpoint.interval.ms = 60000
              log.flush.scheduler.interval.ms = 9223372036854775807
              log.flush.start.offset.checkpoint.interval.ms = 60000
              log.index.interval.bytes = 4096
              log.index.size.max.bytes = 10485760
              log.message.format.version = 0.11.0
              log.message.timestamp.difference.max.ms = 9223372036854775807
              log.message.timestamp.type = CreateTime
              log.preallocate = false
              log.retention.bytes = -1
              log.retention.check.interval.ms = 300000
              log.retention.hours = 168
              log.retention.minutes = null
              log.retention.ms = null
              log.roll.hours = 168
              log.roll.jitter.hours = 0
              log.roll.jitter.ms = null
              log.roll.ms = null
              log.segment.bytes = 1073741824
              log.segment.delete.delay.ms = 60000
              max.connections.per.ip = 2147483647
              max.connections.per.ip.overrides =
              message.max.bytes = 1000012
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              min.insync.replicas = 1
              num.io.threads = 8
              num.network.threads = 3
              num.partitions = 12
              num.recovery.threads.per.data.dir = 1
              num.replica.fetchers = 1
              offset.metadata.max.bytes = 4096
              offsets.commit.required.acks = -1
              offsets.commit.timeout.ms = 5000
              offsets.load.buffer.size = 5242880
              offsets.retention.check.interval.ms = 600000
              offsets.retention.minutes = 10080
              offsets.topic.compression.codec = 0
              offsets.topic.num.partitions = 50
              offsets.topic.replication.factor = 3
              offsets.topic.segment.bytes = 104857600
              port = 9092
              principal.builder.class = null
              producer.purgatory.purge.interval.requests = 1000
              queued.max.request.bytes = -1
              queued.max.requests = 500
              quota.consumer.default = 9223372036854775807
              quota.producer.default = 9223372036854775807
              quota.window.num = 11
              quota.window.size.seconds = 1
              replica.fetch.backoff.ms = 1000
              replica.fetch.max.bytes = 1048576
              replica.fetch.min.bytes = 1
              replica.fetch.response.max.bytes = 10485760
              replica.fetch.wait.max.ms = 500
              replica.high.watermark.checkpoint.interval.ms = 5000
              replica.lag.time.max.ms = 10000
              replica.socket.receive.buffer.bytes = 65536
              replica.socket.timeout.ms = 30000
              replication.quota.window.num = 11
              replication.quota.window.size.seconds = 1
              request.timeout.ms = 30000
              reserved.broker.max.id = 1000
              sasl.enabled.mechanisms = [GSSAPI]
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.principal.to.local.rules = [DEFAULT]
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.mechanism.inter.broker.protocol = GSSAPI
              security.inter.broker.protocol = PLAINTEXT
              socket.receive.buffer.bytes = 102400
              socket.request.max.bytes = 104857600
              socket.send.buffer.bytes = 102400
              ssl.cipher.suites = null
              ssl.client.auth = none
              ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
              ssl.endpoint.identification.algorithm = null
              ssl.key.password = null
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.location = null
              ssl.keystore.password = null
              ssl.keystore.type = JKS
              ssl.protocol = TLS
              ssl.provider = null
              ssl.secure.random.implementation = null
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.location = null
              ssl.truststore.password = null
              ssl.truststore.type = JKS
              transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
              transaction.max.timeout.ms = 900000
              transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
              transaction.state.log.load.buffer.size = 5242880
              transaction.state.log.min.isr = 2
              transaction.state.log.num.partitions = 50
              transaction.state.log.replication.factor = 3
              transaction.state.log.segment.bytes = 104857600
              transactional.id.expiration.ms = 604800000
              unclean.leader.election.enable = false
              zookeeper.connect = (zookeeper connection string here)
              zookeeper.connection.timeout.ms = 6000
              zookeeper.session.timeout.ms = 6000
              zookeeper.set.acl = false
              zookeeper.sync.time.ms = 2000
       (kafka.server.KafkaConfig)
      [2018-03-28 10:40:29,745] INFO starting (kafka.server.KafkaServer)
      

      The clients (sprint-kafka 2.1.4 that provides kafka-clients 1.0.x) have the following configuration which is:

      • max.partition.fetch.bytes = 1048576

      While looks like increasing the message size might do the trick, we are concerned that this happens again as soon as a customers start publishing messages closer to the message.max.bytes limit. We decided to open this bug report because we think something might be wrong if message.max.bytes is 1MB and the log-cleaner throws an exception while compacting messages on __consumer_offsets because it find messages that are bigger than that.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            ricbartm Ricardo Bartolome
            Votes:
            0 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment