Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.11.2
Description
Here are my imaginative execution flows:
1. In mailbox thread, we enters SourceReaderBase.getNextFetch. After executes splitFetcherManager.checkErrors() but before elementsQueue.poll(), SplitFetcher gets its chance to run.
2. SplitFetcher terminates due to exception from SplitReader.fetch. SplitFetcher.shutdownHook will removes this exceptional fetcher from SplitFetcherManager.
3. In mailbox thread, elementsQueue.poll() executes. If there is no elements in queue, elementsQueue will be reset to unavailable.
4. After getting no elements from SourceReaderBase.getNextFetch, we will enter SourceReaderBase.finishedOrAvailableLater. If the exceptional fetcher is last alive fetcher, then SourceReaderBase.finishedOrAvailableLater may evaluate to InputStatus.END_OF_INPUT
5. StreamTask will terminate itself due to InputStatus.END_OF_INPUT.
Here is revised SourceReaderBaseTest.testExceptionInSplitReader which will fails in rate about 1/2.
@Test public void testExceptionInSplitReader() throws Exception { expectedException.expect(RuntimeException.class); expectedException.expectMessage("One or more fetchers have encountered exception"); final String errMsg = "Testing Exception"; FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); // We have to handle split changes first, otherwise fetch will not be called. try (MockSourceReader reader = new MockSourceReader( elementsQueue, () -> new SplitReader<int[], MockSourceSplit>() { @Override public RecordsWithSplitIds<int[]> fetch() { throw new RuntimeException(errMsg); } @Override public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} @Override public void wakeUp() { } }, getConfig(), null)) { ValidatingSourceOutput output = new ValidatingSourceOutput(); reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); reader.handleSourceEvents(new NoMoreSplitsEvent()); // This is not a real infinite loop, it is supposed to throw exception after some polls. while (true) { InputStatus inputStatus = reader.pollNext(output); assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); // Add a sleep to avoid tight loop. Thread.sleep(0); } } }
This revised SourceReaderBaseTest.testExceptionInSplitReader differs from existing one in three places:
1. reader.handleSourceEvents(new NoMoreSplitsEvent()) sets SourceReaderBase.noMoreSplitsAssignment to true.
2. Add assertion to assert that reader.pollNext will not return InputStatus.END_OF_INPUT.
3. Modify Thread.sleep(1) to Thread.sleep(0) to increase failure rate from 1/200 to 1/2.
See FLINK-19448 for initial discussion.
Attachments
Issue Links
- links to