Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20618

Some of the source operator subtasks will stuck when flink job in critical backpressure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Not A Bug
    • 1.10.0, 1.11.1
    • None
    • Runtime / Network
    • None

    Description

      In some critical backpressure situation, some of the subtasks of source will blocked to request buffer because of the LocalBufferPool is full,so the whole task will be stuck and the other task run well.

      Bellow is the jstack trace:
       
      Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) -> SourceConversion(table=[default_catalog.default_database.transfer_c5, source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition [0x00007f43b8488000]
      java.lang.Thread.State: WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)
          - parking to wait for <0x00000000db234488> (a java.util.concurrent.CompletableFuture$Signaller)
          at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
          at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
          at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
          at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
          at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
          at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
          at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
          at StreamExecCalc$33.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
          at SourceConversion$4.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
          - locked <0x00000000d8d50fa8> (a java.lang.Object)
          at org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
          - locked <0x00000000d8d50fa8> (a java.lang.Object)
          at org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
          at org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
       

       
       
      Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) -> SourceConversion(table=[default_catalog.default_database.transfer_c5, source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry [0x00007f443dfd8000]
      java.lang.Thread.State: BLOCKED (on object monitor)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
          - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
          at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
          at java.lang.Thread.run(Thread.java:748)

      Attachments

        1. 2020-12-16 11-47-37 的屏幕截图.png
          57 kB
          zlzhang0122
        2. 2020-12-16 11-48-30 的屏幕截图.png
          107 kB
          zlzhang0122
        3. 2020-12-16 11-53-42 的屏幕截图.png
          72 kB
          zlzhang0122
        4. 2020-12-16 11-49-01 的屏幕截图.png
          112 kB
          zlzhang0122
        5. 2020-12-17 11-10-06 的屏幕截图.png
          40 kB
          zlzhang0122
        6. 2020-12-17 16-45-00 的屏幕截图.png
          53 kB
          zlzhang0122
        7. stuck_node.txt
          104 kB
          zlzhang0122
        8. stuck_node_downstream.txt
          99 kB
          zlzhang0122

        Activity

          People

            Unassigned Unassigned
            zlzhang0122 zlzhang0122
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: