Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32826

Our job is stuck on requestMemorySegmentBlocking

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.16.0, 1.16.2
    • None
    • Runtime / Network
    • None

    Description

      We have a Flink job and find it often gets stuck on requesting memory segment.

       

      "shardConsumers-Source: xxx[1] -> Calc[2] (1/1)#0-thread-0" Id=107 WAITING on java.util.concurrent.CompletableFuture$Signaller@62e01595
          at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)
          -  waiting on java.util.concurrent.CompletableFuture$Signaller@62e01595
          at java.base@11.0.20/java.util.concurrent.locks.LockSupport.park(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.CompletableFuture$Signaller.block(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.CompletableFuture.waitingGet(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.CompletableFuture.get(Unknown Source)
          at app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:383)
          at app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:355)
          at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
          at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
          at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
          at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
          at app//org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
          at app//org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
          at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
          at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
          at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
          at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
          at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
          at StreamExecCalc$23.processElement_split2(Unknown Source)
          at StreamExecCalc$23.processElement(Unknown Source)
          at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
          at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
          at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
          at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
          at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
          at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)
          at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)
          at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$$Lambda$1780/0x0000000840f89440.accept(Unknown Source)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher$$Lambda$1781/0x0000000840f89840.accept(Unknown Source)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:360)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169)
          at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124)
          at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
          at java.base@11.0.20/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.FutureTask.run(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
          at java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
          at java.base@11.0.20/java.lang.Thread.run(Unknown Source) 

       

      We also noticed high backpressure, but we couldn't find the reason. The downstream writer thread was waiting for a message from mailbox.

      "xxx[3]: Writer (1/1)#0" Id=91 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929    at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)    -  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929    at java.base@11.0.20/java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)    at java.base@11.0.20/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)    at app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)    at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)    at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)    at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)    at app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) 

      Some observations:

      • The job logic is pretty simple: it consumes AWS Kinesis, does some filtering and writes results to another Kinesis.
      • The job gets stuck after running for 3-4 days.
        • If we restart from last checkpoint, the job will get stuck again soon.
        • If we restart without checkpoint, the job will recover, and may be stuck in a few days again.
      • We have several jobs consuming different Kinesis, but only this one has problem. This Kinesis has only one shard, and data volume is small.
      • At first we were using 1.16.0, after found some issues like FLINK-29298FLINK-31293 related to LocalBufferPool, we upgraded to the latest 1.16.2, but the issue was not solved.

      The heap dump of LocalBufferPool:

       

      @LocalBufferPool[
              LOG=@Log4jLogger[
                  FQCN=@String[org.apache.logging.slf4j.Log4jLogger],
                  serialVersionUID=@Long[7869000638091304316],
                  EVENT_MARKER=@Log4jMarker[org.apache.logging.slf4j.Log4jMarker@3f47a99],
                  CONVERTER=null,
                  eventLogger=@Boolean[false],
                  logger=@Logger[org.apache.flink.runtime.io.network.buffer.LocalBufferPool:INFO in 4783da3f],
                  name=@String[org.apache.flink.runtime.io.network.buffer.LocalBufferPool],
              ],
              UNKNOWN_CHANNEL=@Integer[-1],
              networkBufferPool=@NetworkBufferPool[
                  UNBOUNDED_POOL_SIZE=@Integer[2147483647],
                  USAGE_WARNING_THRESHOLD=@Integer[100],
                  LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@16f3a390],
                  totalNumberOfMemorySegments=@Integer[5079],
                  memorySegmentSize=@Integer[32768],
                  availableMemorySegments=@ArrayDeque[isEmpty=false;size=5068],
                  isDestroyed=@Boolean[false],
                  factoryLock=@Object[java.lang.Object@5151b0a4],
                  allBufferPools=@HashSet[isEmpty=false;size=2],
                  resizableBufferPools=@HashSet[isEmpty=false;size=2],
                  numTotalRequiredBuffers=@Integer[3],
                  requestSegmentsTimeout=@Duration[PT30S],
                  availabilityHelper=@AvailabilityHelper[AVAILABLE],
                  lastCheckedUsage=@Integer[0],
                  $assertionsDisabled=@Boolean[true],
              ],
              numberOfRequiredMemorySegments=@Integer[2],
              availableMemorySegments=@ArrayDeque[isEmpty=true;size=0],
              registeredListeners=@ArrayDeque[isEmpty=true;size=0],
              maxNumberOfMemorySegments=@Integer[10],
              currentPoolSize=@Integer[10],
              numberOfRequestedMemorySegments=@Integer[10],
              maxBuffersPerChannel=@Integer[10],
              subpartitionBuffersCount=@int[][
                  @Integer[10],
              ],
              subpartitionBufferRecyclers=@BufferRecycler[][
                  @SubpartitionBufferRecycler[org.apache.flink.runtime.io.network.buffer.LocalBufferPool$SubpartitionBufferRecycler@41b3ff09],
              ],
              unavailableSubpartitionsCount=@Integer[1],
              maxOverdraftBuffersPerGate=@Integer[0],
              isDestroyed=@Boolean[false],
              availabilityHelper=@AvailabilityHelper[
                  availableFuture=@CompletableFuture[java.util.concurrent.CompletableFuture@4570da85[Not completed, 1 dependents]],
              ],
              requestingNotificationOfGlobalPoolAvailable=@Boolean[false],
              $assertionsDisabled=@Boolean[true],
          ] 

      Could you help give some clues on how to troubleshoot such problem? Or if you need more information, please let me know, thank you!

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            cndpzc Zhongyi Sun
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: