When the flushAways is enabled (namely set buffer timeout to 0), there might be cases like:
- The subpartition emit an event which blocks the channel
- The subpartition produce more records. However, this records would not be notified since isBlocked = true.
- When the downstream tasks resume the subpartition later, the subpartition would only mark isBlocked to false. For local input channels although it tries to add the channel if isAvailable = true, but this check would not pass since flushRequest = false.
One case for this issue is https://issues.apache.org/jira/browse/FLINK-22085 which uses LocalInputChannel.