Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12544

Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition

    XMLWordPrintableJSON

Details

    Description

      It is reported by flink user, and the original jstack is as following:

       

      // "CoGroup (2/2)":
                      at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
                      - waiting to lock <0x000000062bf859b8> (a java.lang.Object)
                      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
                      at java.lang.Thread.run(Thread.java:745)
      "CoGroup (1/2)":
                      at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
                      - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)
                      at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
                      at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
                      at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
                      at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
                      at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
                      at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
                      at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
                      - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)
                      at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
                      at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
                      at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
                      at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
                      - locked <0x000000063c785350> (a java.lang.Object)
                      at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
                      at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
                      at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
                      - locked <0x000000062bf859b8> (a java.lang.Object)
                      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
                      at java.lang.Thread.run(Thread.java:745)
      "DataSource  (1/1)":
                      at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
                      - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)
                      at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
                      at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
                      - locked <0x000000063fdf4888> (a java.util.ArrayDeque)
                      at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
                      at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
                      at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
                      at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
                      at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
                      at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
                      at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
                      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
                      at java.lang.Thread.run(Thread.java:745)
      

      Based on the above stack, it happens in the following scenario:

      • taskA: emit -> requestBufferBuilder -> synchronized in LocalBufferPool -> SpillableSubpartition#releaseMemory -> wait for  synchronized in  SpillableSubpartition
      • submit TaskB:  trigger taskA releaseMemory ->  SpillableSubpartition#releaseMemory ->  synchronized in  SpillableSubpartition ->  SpillableSubpartition#spillFinishedBufferConsumers -> bufferConsumer#close -> LocalBufferPool#recycle -> wait for synchronized in LocalBufferPool

       

      Attachments

        Issue Links

          Activity

            People

              zjwang Zhijiang
              zjwang Zhijiang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m