Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22946

Network buffer deadlock introduced by unaligned checkpoint

    XMLWordPrintableJSON

Details

    Description

      We recently encountered deadlock when using unaligned checkpoint. Below are two thread stacks that cause deadlock:

      "Channel state writer Join(xxxxxx) (34/256)#1": at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) - waiting to lock <0x00000007296dfa90> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) - locked <0x00000007296bc450> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown Source) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown Source) at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)
      "Join(xxxxxx) (34/256)#1": at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) - waiting to lock <0x00000007296bc450> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) - locked <0x00000007296dfa90> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:745)

      The root cause of this problem is that unaligned checkpoint makes it possible that under the same input gate, multiple input channels may recycle network buffer at the same time.

      Previously, network buffer recycling would only occur serially between input channels under the same input gate, because each sub-task is process Input data serially, and an input gate belongs to only one sub-task. When unaligned checkpoint is enabled, each input channel will take a snapshot of the input channel when it receives the checkpoint barrier, and the network buffer may be recycled in the process.

      Unfortunately, the current network buffer recycling mechanism does not take into account the situation where multiple input channels perform network buffer recycling at the same time. The following code block is from org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue that causes deadlock when multiple input channels under same input gate perform network buffer recycling at the same time.

      The solution to this problem is quite straightforward. Here are two possible solutions:
      1. Case by case solution. Note that input channel A (locked A) gave the released network buffer to input channel B (waiting to lock B), and input channel B (locked B) gave the released network buffer to input channel A (waiting to lock A) ), so when an input channel releases the network buffer, first check whether it is also waiting for the network buffer, and if it is, directly allocate it to itself, which can avoid the situation that different input channels exchange network buffers.
      2. A straightforward solution. Considering that the input channel occupies the lock during recycle to remove the network buffer from the bufferQueue, the subsequent operations do not need to hold this lock. Therefore, we only need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.

      Attachments

        1. Screen Shot 2021-06-09 at 7.02.04 PM.png
          201 kB
          Guokuai Huang
        2. Screen Shot 2021-06-09 at 6.39.47 PM.png
          179 kB
          Guokuai Huang

        Issue Links

          Activity

            People

              guokuai.huang Guokuai Huang
              guokuai.huang Guokuai Huang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: