Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15525

Segment uploads stop working following a broker failure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.6.0
    • None
    • Tiered-Storage
    • None

    Description

      I have a tiered-storage enabled cluster and topic where I continuously produce and consume to/from a TS-enabled topic on that cluster.

      Here are the topic settings I’m using: 

      local.retention.ms=120000
      remote.storage.enable=true
      retention.ms: 10800000
      segment.bytes: 512000000
      

      Here are my broker settings:

      remote.log.storage.system.enable=true
      remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/*
      remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
      remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
      remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT
      remote.log.manager.task.interval.ms=5000
      remote.log.manager.thread.pool.size=10
      remote.log.reader.threads=10
      remote.log.reader.max.pending.tasks=100
      rlmm.config.remote.log.metadata.topic.replication.factor=1
      rlmm.config.remote.log.metadata.topic.num.partitions=50
      rlmm.config.remote.log.metadata.topic.retention.ms=-1
      rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache
      rsm.config.chunk.cache.path=/data/tiered-storage-cache
      rsm.config.chunk.cache.size=1073741824
      rsm.config.metrics.recording.level=DEBUG    rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
      rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
      rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
      rsm.config.storage.s3.region=us-east-1
      rsm.config.chunk.size=102400
      rsm.config.storage.s3.multipart.upload.part.size=16777216 

      When a broker in the cluster get rotated (replaced or restarted) some brokers start throwing this error repeatedly: 

      [RemoteLogManager=10000 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error occurred while copying log segments of partition: yTypIvtBRY2l3sD4-8M7fA:loadgen-3 
      
      java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer.
          at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
          at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
          at kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728)
          at kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687)
          at kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
          at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
          at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
          at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer.
          at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188)
          at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
          at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
          at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
          at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
          at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
          at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
          at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
      Caused by: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer.
          at org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:121)
          at org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:89)
          at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:186)
          ... 7 more

      I observed a few times that rolling restarting the cluster solved the issue.

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            fvisconte Francois Visconte
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: