Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30334

SourceCoordinator error splitRequest check cause HybridSource loss of data and hang

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      If we use hybrid source, for example, filesystem source A read a.csv, filesystem B read b.csv. It's a very simple case, but it will hang in second source with:

      10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0)
      10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split
      10803 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning split to non-localized request: Optional[FileSourceSplit: file:/Users/xxx/a.csv [0, 49) (no host info) ID=0000000001 position=null]
      10808 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Assigned split to subtask 0 : FileSourceSplit: file:/Users/xxx/a.csv [0, 49) (no host info) ID=0000000001 position=null
      10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding splits subtask=0 sourceIndex=0 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 [HybridSourceSplit

      {sourceIndex=0, splitId=0000000001}

      ]
      10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [FileSourceSplit: file:/Users/xxx/a.csv [0, 49) (no host info) ID=0000000001 position=null]
      10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
      10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000001]
      +I[hello_a, flink, 1]
      +I[hello_a, hadoop, 2]
      +I[hello_a, world, 3]
      10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001]
      10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
      10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
      10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
      10869 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0)
      10870 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split
      10872 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 0
      10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
      10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of input subtask=0 sourceIndex=0 org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
      StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
      10874 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] - Starting enumerator for sourceIndex=1
      10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch source event: subtask=0 sourceIndex=1 source=org.apache.flink.connector.file.src.FileSource@12ef574f
      10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
      10882 [SourceCoordinator-Source: hybrid_source[1]] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0)

      do not read next source data and hang at 'received split request from parallel task'

       

      The reason is that in the 1.16 & master, latest code add context.hasNoMoreSplits check then call enumerator.handleSplitRequest.  We do understand the comments for reducing the call splits. But it not consider the the situation about HybridSource. When a subtask hasNoMoreSplits, it will switch to next source. But here just set a check without this situation. When first source read finish, the context just let this subtask with noMoreSplit Status. And the later check can't assign splits with next sources. However , the flink 1.15 is correct.

       

      SourceCoordinator

       

      private void handleRequestSplitEvent(int subtask, int attemptNumber, RequestSplitEvent event) {
          LOG.info(
                  "Source {} received split request from parallel task {} (#{})",
                  operatorName,
                  subtask,
                  attemptNumber);
          // request splits from the enumerator only if the enumerator has un-assigned splits
          // this helps to reduce unnecessary split requests to the enumerator
          if (!context.hasNoMoreSplits(subtask)) {
              enumerator.handleSplitRequest(subtask, event.hostName());
          }
      } 

      SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not read the other child sources in hybrid source.

       

      SourceCoordinatorContext

       

      boolean hasNoMoreSplits(int subtaskIndex) { return subtaskHasNoMoreSplits[subtaskIndex]; }
      
      
      @Override
      public void signalNoMoreSplits(int subtask) {
          checkSubtaskIndex(subtask);
      
          // Ensure the split assignment is done by the coordinator executor.
          callInCoordinatorThread(
                  () -> {
                      subtaskHasNoMoreSplits[subtask] = true;
                      signalNoMoreSplitsToAttempts(subtask);
                      return null; // void return value
                  },
                  "Failed to send 'NoMoreSplits' to reader " + subtask);
      }
      
      

      context set subtask noMoreSplit is true if source is done (without considering the hybrid situation).

       

       

      1.15

       

      public void handleEventFromOperator(int subtask, OperatorEvent event) {
          runInEventLoop(
                  () -> {
                      if (event instanceof RequestSplitEvent) {
                          LOG.info(
                                  "Source {} received split request from parallel task {}",
                                  operatorName,
                                  subtask);
                          enumerator.handleSplitRequest(
                                  subtask, ((RequestSplitEvent) event).hostName());
                      }  

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            taoran Ran Tao
            taoran Ran Tao
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment