Description
I'm running on EMR Pyspark 3.0.0. with project structure below, process.py is what controls the flow of the application and calls code inside the file_processor package. The command hangs when the .foreachPartition code that is located inside s3_repo.py is called by process.py. When the same .foreachPartition code is moved from s3_repo.py and placed inside the process.py it runs just fine.
process.py file_processor config spark.py repository s3_repo.py structure table_creator.py
process.py
from file_processor.structure import table_creator from file_processor.repository import s3_repo def process(): table_creator.create_table() s3_repo.save_to_s3() if __name__ == '__main__': process()
spark.py
from pyspark.sql import SparkSession spark_session = SparkSession.builder.appName("Test").getOrCreate()
s3_repo.py
from file_processor.config.spark import spark_session def save_to_s3(): spark_session.sql('SELECT * FROM rawFileData').toJSON().foreachPartition(_save_to_s3) def _save_to_s3(iterator): for record in iterator: print(record)
table_creator.py
from file_processor.config.spark import spark_session from pyspark.sql import Row def create_table(): file_contents = [ {'line_num': 1, 'contents': 'line 1'}, {'line_num': 2, 'contents': 'line 2'}, {'line_num': 3, 'contents': 'line 3'} ] spark_session.createDataFrame(Row(**row) for row in file_contents).cache().createOrReplaceTempView("rawFileData")