Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5432

ContinuousFileMonitoringFunction is not monitoring nested files

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: filesystem-connector
    • Labels:
      None

      Description

      The ContinuousFileMonitoringFunction does not monitor nested files even if the inputformat has NestedFileEnumeration set to true. This can be fixed by enabling a recursive scan of the directories in the listEligibleFiles method.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ymarzougui opened a pull request:

          https://github.com/apache/flink/pull/3090

          FLINK-5432 fix nested files enumeration in ContinuousFileMonitoringFunction

          This PR fixes reading nested files when the InputFormat has NestedFileEnumeration set to true. Nested files were not read because the code in listEligibleFiles did not recursively enumerate the input paths.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ymarzougui/flink FLINK-5432

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3090.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3090


          commit c74996b278187e348af7043ddc0aa9a500373502
          Author: Yassine Marzougui <y.marzougui@mindlytix.com>
          Date: 2017-01-11T00:43:19Z

          FLINK-5432 recursively scan nested files in ContinuousFileMonitoringFunction


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ymarzougui opened a pull request: https://github.com/apache/flink/pull/3090 FLINK-5432 fix nested files enumeration in ContinuousFileMonitoringFunction This PR fixes reading nested files when the InputFormat has NestedFileEnumeration set to true. Nested files were not read because the code in listEligibleFiles did not recursively enumerate the input paths. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ymarzougui/flink FLINK-5432 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3090.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3090 commit c74996b278187e348af7043ddc0aa9a500373502 Author: Yassine Marzougui <y.marzougui@mindlytix.com> Date: 2017-01-11T00:43:19Z FLINK-5432 recursively scan nested files in ContinuousFileMonitoringFunction
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3090

          The changes look very good! I think it would be good to add a test for nested reading in `ContinuousFileProcessingTest`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3090 The changes look very good! I think it would be good to add a test for nested reading in `ContinuousFileProcessingTest`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3090#discussion_r95782207

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java —
          @@ -282,7 +282,7 @@ private void monitorDirAndForwardSplits(FileSystem fs,

          • Returns the paths of the files not yet processed.
          • @param fileSystem The filesystem where the monitored directory resides.
            */
          • private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
            + private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, String path) throws IOException {
              • End diff –

          I would suggest passing a `Path` here. It is always a safer option to rely on the this class than on strings.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3090#discussion_r95782207 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java — @@ -282,7 +282,7 @@ private void monitorDirAndForwardSplits(FileSystem fs, Returns the paths of the files not yet processed. @param fileSystem The filesystem where the monitored directory resides. */ private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException { + private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, String path) throws IOException { End diff – I would suggest passing a `Path` here. It is always a safer option to rely on the this class than on strings.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ymarzougui commented on the issue:

          https://github.com/apache/flink/pull/3090

          @aljoscha Added a test and changed Path type according to @zentol's suugestion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ymarzougui commented on the issue: https://github.com/apache/flink/pull/3090 @aljoscha Added a test and changed Path type according to @zentol's suugestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ymarzougui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3090#discussion_r95976740

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java —
          @@ -282,7 +282,7 @@ private void monitorDirAndForwardSplits(FileSystem fs,

          • Returns the paths of the files not yet processed.
          • @param fileSystem The filesystem where the monitored directory resides.
            */
          • private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
            + private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, String path) throws IOException {
              • End diff –

          Thanks, changed in the last commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ymarzougui commented on a diff in the pull request: https://github.com/apache/flink/pull/3090#discussion_r95976740 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java — @@ -282,7 +282,7 @@ private void monitorDirAndForwardSplits(FileSystem fs, Returns the paths of the files not yet processed. @param fileSystem The filesystem where the monitored directory resides. */ private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException { + private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, String path) throws IOException { End diff – Thanks, changed in the last commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3090

          @zentol do you want to recheck and merge if good?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3090 @zentol do you want to recheck and merge if good?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3090

          looks good to me, adding it to my next batch,

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3090 looks good to me, adding it to my next batch,
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3090

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3090
          Hide
          Zentol Chesnay Schepler added a comment -

          master: 9945904e2251e7c0e218e2766bf07778d1307277
          1.2: 28c18e22127a85f773e7504a0e9d188bad9334e2

          Show
          Zentol Chesnay Schepler added a comment - master: 9945904e2251e7c0e218e2766bf07778d1307277 1.2: 28c18e22127a85f773e7504a0e9d188bad9334e2

            People

            • Assignee:
              ymarzougui Yassine Marzougui
              Reporter:
              ymarzougui Yassine Marzougui
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development