Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9587

ContinuousFileMonitoringFunction crashes on short living files

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Not A Problem
    • 1.5.0
    • None
    • None
    • Flink 1.5 running as a standalone cluster.

    Description

      Hi,

       

      We use Flink to monitor a directory for new files. The filesystem is a MapR Fuse mount that looks like a local FS.

      The files are copied to the directory by another process that uses rsync command. While a file is not completely written rsync creates a temporary file with a name like ".file.txt.uM6MfZ" where the last extension is a random string.

      When the copying is done - file is renamed to the final name "file.txt".

       

      The bug is that Flink does not correctly handle this behavior and does not take into account that files in the directory might be deleted.

       

      We are getting error traces:

      java.io.FileNotFoundException: File file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or the user running Flink ('root') has insufficient permissions to access it.
      at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
      at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
      at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
      at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
      at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
      at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
      at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
      at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
      at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)
      

      In LocalFileSystem.listStatus(final Path f) we read the list of files in a directory and then create LocalFileStatus object for each of the files. But a file might be removed during the interval between these operations.

      I do not see any option to handle this exception in our code.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            shumanski Andrei Shumanski
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: