Currently ExchangeNode::Close() does not notify the sender fragment that the stream was closed. This works in a some cases:
- The full input was consumed - since the sender fragment is closed anyway.
- The sender and receiver fragments are both being closed as part of query cleanup.
- ExchangeNode::Close() was called as a result of hitting a limit in the coordinator fragment - the query finished successfully and the coordinator will cancel all fragments immediately.
It doesn't work in another case that would be useful: if only part of the input has been consumed somewhere in the middle of the plan, but the consuming node has consumed all the required input (e.g. short-circuiting join optimisations like IMPALA-3987 or if a limit has been hit). In those cases we could close the exchange node and sending fragment to clean up resources.
What happens if you do this is:
- The ExchangeNode is closed, and the DataStreamRecvr is added to closed_recvrs_cache_
- The sending fragment keeps sending rows, which are discarded when the receiver is found in closed_recvrs_cache_
- Once the receiver ages out of closed_recvrs_cache_, an error will be propagated back to the sender, which will fail the query.
This is bad in that it does a lot of unnecessary processing, holds onto resources, and can cause queries to fail.
We should really notify the sender that the receiver closed down, so that it can close the fragment once all receivers are closed.