Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.4.1
Description
I tried loading a raster dataset composed of 20000+ GeoTiff images in a local spark session using the following code:
df_binary = spark.read.format("binaryFile").option("pathGlobFilter", "*.tif").option("recursiveFileLookup", "true").load(DATA_ROOT_PATH + '/raster/EuroSAT_MS') df_geotiff = df_binary.withColumn("rast", expr("RS_FromGeoTiff(content)")).withColumn("name", expr("reverse(split(path, '/'))[0]")).select("name", "length", "rast") df_geotiff.where("name LIKE 'Forest_%.tif'").selectExpr("name", "RS_BandAsArray(rast, 3) as band").orderBy("name").show()
The spark job failed with the following error messages:
Py4JJavaError: An error occurred while calling o70.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 782 in stage 5.0 failed 1 times, most recent failure: Lost task 782.0 in stage 5.0 (TID 786) (kontinuation executor driver): java.io.FileNotFoundException: /home/kontinuation/documents/wherobots/notebooks/data/raster/EuroSAT_MS/Forest/Forest_2298.tif (Too many open files) It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32) at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:664) at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1539) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
It says that the spark job is opening too many files. If we run lsof to inspect opened files, we can see most of them are temporary files prefixed by imageio:
java 3843951 kontinuation 1006u REG 252,1 107244 1204728 /tmp/imageio3709666550975207536.tmp java 3843951 kontinuation 1007u REG 252,1 107244 1204729 /tmp/imageio7503001112441146978.tmp java 3843951 kontinuation 1008u REG 252,1 107244 1204730 /tmp/imageio1035759556272836613.tmp java 3843951 kontinuation 1009u REG 252,1 107244 1204731 /tmp/imageio451679980601844202.tmp java 3843951 kontinuation 1010u REG 252,1 107244 1204732 /tmp/imageio2111699718021158223.tmp java 3843951 kontinuation 1011u REG 252,1 107244 1204733 /tmp/imageio8919853818666809481.tmp java 3843951 kontinuation 1012u REG 252,1 107244 1204734 /tmp/imageio6956257348066899899.tmp java 3843951 kontinuation 1013u REG 252,1 107244 1204735 /tmp/imageio3045964803135174263.tmp java 3843951 kontinuation 1014u REG 252,1 107244 1204736 /tmp/imageio8138794596381465904.tmp java 3843951 kontinuation 1015u REG 252,1 107244 1204737 /tmp/imageio6991404647914889791.tmp java 3843951 kontinuation 1016u REG 252,1 107244 1204738 /tmp/imageio3098287432603901322.tmp java 3843951 kontinuation 1017u REG 252,1 107244 1204739 /tmp/imageio599912999779858439.tmp java 3843951 kontinuation 1018u REG 252,1 107244 1204740 /tmp/imageio8841430021636925470.tmp java 3843951 kontinuation 1019u REG 252,1 107244 1204741 /tmp/imageio8981079233288315985.tmp java 3843951 kontinuation 1020u REG 252,1 107244 1204742 /tmp/imageio3673591736487787612.tmp java 3843951 kontinuation 1021u REG 252,1 107244 1204743 /tmp/imageio8805168727392534534.tmp java 3843951 kontinuation 1022u REG 252,1 107244 1204744 /tmp/imageio441228595459753924.tmp java 3843951 kontinuation 1023u REG 252,1 107244 1204753 /tmp/imageio6548224310964783498.tmp
My first attempt to fix the problem is to dispose the GridCoverage2D object after using it inĀ RS_BandAsArray. However, it does not fix this problem. I've done further investigations and found that there's another problem in the GeoTiffReader provided by GeoTools: it initializes a file-backed cache when reading GeoTiff from an input stream, and won't close the file-backed cache when the grid coverage object was disposed. The temporary files named imageioXXXX where created by the file-backed cache. If the size of the raster dataset exceeds the maximum number of opened files, the job will fail and the spark session won't properly respond to any future queries.
Attachments
Issue Links
- links to