Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck



      Consider there are three dataNodes: dn1,dn2,and dn3, and we write some data to FanOutOneBlockAsyncDFSOutput and then flush it, there are one Callback in FanOutOneBlockAsyncDFSOutput.waitingAckQueue. If the ack from dn1 arrives firstly and triggers Netty to invoke FanOutOneBlockAsyncDFSOutput.completed with dn1's channel, then in FanOutOneBlockAsyncDFSOutput.completed, dn1's channel is removed from Callback.unfinishedReplicas.
      But dn2 and dn3 respond slowly, before dn2 and dn3 sending ack , dn1 is shut down or have a exception, so FanOutOneBlockAsyncDFSOutput.failed is triggered by Netty with dn1's channel, and because the Callback.unfinishedReplicas does not contain dn1's channel,the Callback is skipped in FanOutOneBlockAsyncDFSOutput.failed method, just as following line250, and at line 245, FanOutOneBlockAsyncDFSOutput.state is set to State.BROKEN.

      233  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
      234     if (state == State.BROKEN || state == State.CLOSED) {
      235         return;
      236      }
      244    // disable further write, and fail all pending ack.
      245    state = State.BROKEN;
      246    Throwable error = errorSupplier.get();
      247    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
      248      Callback c = iter.next();
      249      // find the first sync request which we have not acked yet and fail all the request after it.
      250      if (!c.unfinishedReplicas.contains(channel.id())) {
      251        continue;
      252      }
      253      for (;;) {
      254        c.future.completeExceptionally(error);
      255        if (!iter.hasNext()) {
      256          break;
      257        }
      258        c = iter.next();
      259      }
      260    break;
      261    }
      262   datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
      263  }

      At the end of above method in line 262, dn1,dn2 and dn3 are all closed, so the FanOutOneBlockAsyncDFSOutput.failed is triggered again by dn2 and dn3, but at the above line 234, because FanOutOneBlockAsyncDFSOutput.state is already State.BROKEN, the whole FanOutOneBlockAsyncDFSOutput.failed is skipped. So wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck for ever.

      When we roll the wal, we would create a new FanOutOneBlockAsyncDFSOutput and a new AsyncProtobufLogWriter, in AsyncProtobufLogWriter.init we write wal header to FanOutOneBlockAsyncDFSOutput and wait it to complete. If we run into this situation, the roll would stuck forever.

      I have simulate this case in the PR, and my fix is even through the FanOutOneBlockAsyncDFSOutput.state is already State.BROKEN, we would still try to trigger Callback.future


