Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25155

Streaming from storage doesn't work when no directories exists



    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Cannot Reproduce
    • 2.3.1
    • None
    • None


      I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` method `findNewFiles`.

      Streaming for the giving path suppose to pickup new files only ( based on the previous run timestamp ). However the code in Spark will first obtain directories, then for each directory will find new files. Here is the relevant code:

      val directoryFilter = new PathFilter

      {   *override def* accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory }

      val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)

      val newFiles = directories.flatMap(dir =>

        fs.listStatus(dir, newFileFilter).map(_.getPath.toString))


      This is not optimized, as it always requires two accesses.  In addition this seems to be  buggy

      I have an S3 bucket “mydata” with  objects “a.csv”, “b.csv”. I noticed that   fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 directories and so “a.csv”, “b.csv” were not picked by Spark.

      I tried to make path as “[s3a://mydata/*]” and it didn't worked also.

      I experienced the same problematic behavior with the file system when tried to stream from “/Users/streaming/*”

       I suggest to change the code in Spark so it will perform first list without directoryFilter, which seems not needed at all. The code could  be

      val directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath)

      The flow would be ( for each entry in  directoriesOrfiles )

      • If data object: Spark will apply newFileFilter on the returned objects
      • If directory: then the existing  code will perform additional listing at the directory level

      This way it will pick up files from the root of path and the content of directories




            Unassigned Unassigned
            gvernik Gil Vernik
            0 Vote for this issue
            3 Start watching this issue

