Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.1
-
None
-
Important
Description
I am currently running apache spark 3.1.1. on kubernetes.
When I try to re-add a file that has already been added I see that the updated file is not actually loaded into the cluster. I see the following warning when calling the addFile() function.
22/01/18 19:05:50 WARN SparkContext: The path http://15.4.12.12:80/demo_data.csv has been added already. Overwriting of added paths is not supported in the current version.
When I display the dataframe that was loaded I see that the old data is loaded. If I log into the worker pods and delete the file, the same results or observed.
My SparkConf has the following configurations
('spark.master', 'k8s://https://15.4.7.11:6443') ('spark.app.name', 'spark-jupyter-mlib') ('spark.submit.deploy.mode', 'cluster') ('spark.kubernetes.container.image', 'tschneider/apache-spark-k8:v7') ('spark.kubernetes.namespace', 'spark') ('spark.kubernetes.pyspark.pythonVersion', '3') ('spark.kubernetes.authenticate.driver.serviceAccountName', 'spark-sa') ('spark.kubernetes.authenticate.serviceAccountName', 'spark-sa') ('spark.executor.instances', '3') ('spark.executor.cores', '2') ('spark.executor.memory', '4096m') ('spark.executor.memoryOverhead', '1024m') ('spark.driver.memory', '1024m') ('spark.driver.host', '15.4.12.12') ('spark.files.overwrite', 'true') ('spark.files.useFetchCache', 'false')
According to the documentation for 3.1.1. The spark.files.overwrite parameter should in fact load the updated files. The documentation can be found here: https://spark.apache.org/docs/3.1.1/configuration.html
The only workaround is to use a python function to manually delete and re-download the file. Calling addFile still shows the warning in this case. My code for the delete and redownload is as follows:
def os_remove(file_path): import socket hostname = socket.gethostname() action = None import os if os.path.exists(file_path): action = "delete" os.remove(file_path) return (hostname, action)worker_file_path = u"file:///{0}".format(csv_file_name) worker_count = int(spark_session.conf.get('spark.executor.instances')) rdd = sc.parallelize(range(worker_count)).map(lambda var: os_remove(worker_file_path)) rdd.collect() def download_updated_file(file_url): import urllib.parse as parse file_name = os.path.basename(parse.urlparse(csv_file_url).path) local_file_path = "/{0}".format(file_name) import urllib.request as urllib urllib.urlretrieve(file_url, local_file_path) rdd = sc.parallelize(range(worker_count)).map(lambda var: download_updated_file(csv_file_url)) rdd.collect()
I believe this is either a bug or a documentation mistake. Perhaps the configuration parameter has a misleading description?