Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Cannot Reproduce
-
2.3.1
-
None
-
None
Description
I have an issue related `org.apache.spark.streaming.dstream.FileInputDStream` method `findNewFiles`.
Streaming for the giving path suppose to pickup new files only ( based on the previous run timestamp ). However the code in Spark will first obtain directories, then for each directory will find new files. Here is the relevant code:
val directoryFilter = new PathFilter
{ *override def* accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory }val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
This is not optimized, as it always requires two accesses. In addition this seems to be buggy
I have an S3 bucket “mydata” with objects “a.csv”, “b.csv”. I noticed that fs.globStatus(“[s3a://mydata/], directoryFilter).map(_.getPath) returned 0 directories and so “a.csv”, “b.csv” were not picked by Spark.
I tried to make path as “[s3a://mydata/*]” and it didn't worked also.
I experienced the same problematic behavior with the file system when tried to stream from “/Users/streaming/*”
I suggest to change the code in Spark so it will perform first list without directoryFilter, which seems not needed at all. The code could be
val directoriesOrfiles = fs.globStatus(directoryPath).map(_.getPath)
The flow would be ( for each entry in directoriesOrfiles )
- If data object: Spark will apply newFileFilter on the returned objects
- If directory: then the existing code will perform additional listing at the directory level
This way it will pick up files from the root of path and the content of directories