Here are my imaginative execution flows:
1. In mailbox thread, we enters SourceReaderBase.getNextFetch. After executes splitFetcherManager.checkErrors() but before elementsQueue.poll(), SplitFetcher gets its chance to run.
2. SplitFetcher terminates due to exception from SplitReader.fetch. SplitFetcher.shutdownHook will removes this exceptional fetcher from SplitFetcherManager.
3. In mailbox thread, elementsQueue.poll() executes. If there is no elements in queue, elementsQueue will be reset to unavailable.
4. After getting no elements from SourceReaderBase.getNextFetch, we will enter SourceReaderBase.finishedOrAvailableLater. If the exceptional fetcher is last alive fetcher, then SourceReaderBase.finishedOrAvailableLater may evaluate to InputStatus.END_OF_INPUT
5. StreamTask will terminate itself due to InputStatus.END_OF_INPUT.
Here is revised SourceReaderBaseTest.testExceptionInSplitReader which will fails in rate about 1/2.
This revised SourceReaderBaseTest.testExceptionInSplitReader differs from existing one in three places:
1. reader.handleSourceEvents(new NoMoreSplitsEvent()) sets SourceReaderBase.noMoreSplitsAssignment to true.
2. Add assertion to assert that reader.pollNext will not return InputStatus.END_OF_INPUT.
3. Modify Thread.sleep(1) to Thread.sleep(0) to increase failure rate from 1/200 to 1/2.
See FLINK-19448 for initial discussion.