I agree that that timeout issue does not have a very elegant solution. Here is a new proposal.
1. The Client uses a small pool of memory buffers per dfs-output stream. Say, 10 buffers of size 64K each.
2. A write to the output stream actually copies the user data into one of the buffers, if available. Otherwise the user-write blocks.
3. A separate thread (one per output stream), sends buffers that are full. Each buffer has metadata that contains a sequence number (locally generated on the client) , the length of the buffer and its offset in this block.
4. Another thread(one per output stream) process incoming responses. The incoming response has the sequence number of the buffer that the datanode had processed. The client removes that buffer from its queue.
The Primary Datanode
The primary datanode has two threads per stream. The first thread processes incoming packets from the client, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the client.
This means that the client gets back an ack only when the packet is persisted on all datanodes. In the future this can be changed so that the client gets an ack when the data is persisted in dfs.replication.min number of datanodes.
In case the primary datanode encounters an exception while writing to the downstream datanode, it declares the block as bad. It removes the immediate downstream datanode from the pipeline. It makes an RPC to the namenode to abandon the current blockId and*replace* the block id with a new one. It then establishes a new pipeline using the new blockid using the remaining datanodes. It then copies all the data from the local temporary block file to the downstream datanodes using the new blockId.
The Secondary Datanodes
The Secondary datanode has two threads per stream. The first thread processes incoming packets from the upstream datanode, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the upstream datanode.
Each secondary datanode sends its response as well forwards the response of all downstream datanodes.