Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data to FanOutOneBlockAsyncDFSOutput and then flush it, there are one Callback in FanOutOneBlockAsyncDFSOutput.waitingAckQueue. If the ack from dn1 arrives firstly and triggers Netty to invoke FanOutOneBlockAsyncDFSOutput.completed with dn1's channel, then in FanOutOneBlockAsyncDFSOutput.completed, dn1's channel is removed from Callback.unfinishedReplicas.
But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is shut down or have a exception, so FanOutOneBlockAsyncDFSOutput.failed is triggered by Netty with dn1's channel, and because the Callback.unfinishedReplicas does not contain dn1's channel,the Callback is skipped in FanOutOneBlockAsyncDFSOutput.failed method, just as following line250, and at line 245, FanOutOneBlockAsyncDFSOutput.state is set to State.BROKEN.
At the end of above method in line 262, dn1,dn2 and dn3 are all closed, so the FanOutOneBlockAsyncDFSOutput.failed is triggered again by dn2 and dn3, but at the above line 234, because FanOutOneBlockAsyncDFSOutput.state is already State.BROKEN, the whole FanOutOneBlockAsyncDFSOutput.failed is skipped. So wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck for ever.
When we roll the wal, we would create a new FanOutOneBlockAsyncDFSOutput and a new AsyncProtobufLogWriter, in AsyncProtobufLogWriter.init we write wal header to FanOutOneBlockAsyncDFSOutput and wait it to complete. If we run into this situation, the roll would stuck forever.
I have simulate this case in the PR, and my fix is even through the FanOutOneBlockAsyncDFSOutput.state is already State.BROKEN, we would still try to trigger Callback.future