Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19717

SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

    XMLWordPrintableJSON

Details

    Description

      Here are my imaginative execution flows:
      1. In mailbox thread, we enters SourceReaderBase.getNextFetch. After executes splitFetcherManager.checkErrors() but before elementsQueue.poll(), SplitFetcher gets its chance to run.
      2. SplitFetcher terminates due to exception from SplitReader.fetch. SplitFetcher.shutdownHook will removes this exceptional fetcher from SplitFetcherManager.
      3. In mailbox thread, elementsQueue.poll() executes. If there is no elements in queue, elementsQueue will be reset to unavailable.
      4. After getting no elements from SourceReaderBase.getNextFetch, we will enter SourceReaderBase.finishedOrAvailableLater. If the exceptional fetcher is last alive fetcher, then SourceReaderBase.finishedOrAvailableLater may evaluate to InputStatus.END_OF_INPUT
      5. StreamTask will terminate itself due to InputStatus.END_OF_INPUT.

      Here is revised SourceReaderBaseTest.testExceptionInSplitReader which will fails in rate about 1/2.

      	@Test
      	public void testExceptionInSplitReader() throws Exception {
      		expectedException.expect(RuntimeException.class);
      		expectedException.expectMessage("One or more fetchers have encountered exception");
      		final String errMsg = "Testing Exception";
      
      		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
      			new FutureCompletingBlockingQueue<>();
      		// We have to handle split changes first, otherwise fetch will not be called.
      		try (MockSourceReader reader = new MockSourceReader(
      			elementsQueue,
      			() -> new SplitReader<int[], MockSourceSplit>() {
      				@Override
      				public RecordsWithSplitIds<int[]> fetch() {
      					throw new RuntimeException(errMsg);
      				}
      
      				@Override
      				public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
      
      				@Override
      				public void wakeUp() {
      				}
      			},
      			getConfig(),
      			null)) {
      			ValidatingSourceOutput output = new ValidatingSourceOutput();
      			reader.addSplits(Collections.singletonList(getSplit(0,
      				NUM_RECORDS_PER_SPLIT,
      				Boundedness.CONTINUOUS_UNBOUNDED)));
      			reader.handleSourceEvents(new NoMoreSplitsEvent());
      			// This is not a real infinite loop, it is supposed to throw exception after some polls.
      			while (true) {
      				InputStatus inputStatus = reader.pollNext(output);
      				assertNotEquals(InputStatus.END_OF_INPUT, inputStatus);
      				// Add a sleep to avoid tight loop.
      				Thread.sleep(0);
      			}
      		}
      	}
      

      This revised SourceReaderBaseTest.testExceptionInSplitReader differs from existing one in three places:
      1. reader.handleSourceEvents(new NoMoreSplitsEvent()) sets SourceReaderBase.noMoreSplitsAssignment to true.
      2. Add assertion to assert that reader.pollNext will not return InputStatus.END_OF_INPUT.
      3. Modify Thread.sleep(1) to Thread.sleep(0) to increase failure rate from 1/200 to 1/2.

      See FLINK-19448 for initial discussion.

      Attachments

        Issue Links

          Activity

            People

              kezhuw Kezhu Wang
              kezhuw Kezhu Wang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: