The link() method is missing `Override` annotation.
The stream() method has a bug: the parsing of request.message is inconsistent with the ContainerStateMachine#startTransaction(), which will be called during the close of the stream. This caused the stream closing issue in our POC.
Here is the details about the issue in stream() method, by diving deep into the Ratis streaming implementation:
When initializing the stream, we need pass a headerMessage to DataStreamApi#stream().
This message will be used again when calling the DataStreamOutput#close, by the primary DataNode, in DataStreamManageMent#startTransaction.
It is wrapped in a Raft FORWARD request, then unwrapped in the RaftServerImpl#SubmitClientRequestAsync. The filter here is doing the unwrap job.
Then it hands the headerMessage to ContainerStateMachine#startTransaction.
In ContainerStateMachine#startTransaction, it's expecting the request.message being a ContainerCommandRequestMessage.
However, in ContainerStateMachine#stream, it's expecting the same request.message being a ContainerCommandRequestProto.
So we have to change the ContainerStateMachine#stream to match ContainerStateMachine#startTransaction.