The AsyncWaitOperator can deadlock in a special cases when closing two chained AsyncWaitOperator while there is still one element between these two operators in flight.
The deadlock scenario is the following: Given two chained AsyncWaitOperators a1 and a2. a1 has its last element completed. This notifies a1's Emitter, e1, to remove the element from the queue and output it to a2. This poll and output operation happens under the checkpoint lock. Since a1 and a2 are chained, the e1 thread will directly call a2's processElement function. In this function, we try to add the new element to the StreamElementQueue. Now assume that this queue is full. Then the operation will release the checkpoint lock and wait until it is notified again.
In the meantime, a1.close() is called by the StreamTask, because we have consumed all input. The close operation also happens under the checkpoint lock. First the close method waits until all elements from the StreamElementQueue have been processed (== empty). This happens by waiting on the checkpoint lock. Next the e1 is interrupted and we join on e1. When interrupting e1, it currently waits on the checkpoint lock. Since the closing operation does not release the checkpoint lock, e1 cannot regain the synchronization lock and voila we have a deadlock.
There are two problems which cause the problem:
1. We assume that the AsyncWaitOperator has processed all its elements if the queue is empty. This is usually the case if the output operation is atomic. However in the chained case it can happen that the emitter thread has to wait to insert the element into the queue of the next AsyncWaitOperator. Under these circumstances, we release the checkpoint lock and, thus, the output operation is no longer atomic. We can solve this problem by polling the last queue element after we have outputted it instead of before.
2. We interrupt the emitter thread while holding the checkpoint lock and not freeing it again. Under these circumstances, the interrupt signal is meaningless because the emitter thread also needs control over the checkpoint lock. We should solve the problem by waiting on the checkpoint lock and periodically checking whether the thread has already stopped or not.