The backlog of subpartition should indicate how many buffers are consumable, then the consumer could feedback the corresponding credits for transporting these buffers. But in current PipelinedSubpartitionimplementation, the backlog is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. So the backlog only reflects how many buffers are retained in PipelinedSubpartition, which is not always equivalent to the number of consumable buffers.
The backlog inconsistency might result in floating buffers misdistribution on consumer side, because the consumer would request floating buffers based on backlog value, then one floating buffer might not be used in RemoteInputChannel long time after requesting.
Considering the solution, the last buffer in PipelinedSubpartition could only be consumable in the case of flush triggered or partition finished. So we could calculate the backlog precisely based on partition flushed/finished conditions.