Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Won't Fix
-
2.1.1, 2.2.0
-
None
-
None
-
Linux Ubuntu 17.04. Python 3.5.
Description
Spark partionBy is causing some data corruption. I am doing three super simple writes. . Below is the code to reproduce the problem.
Program Output
17/08/10 16:05:03 WARN [SparkUtils]: [Database exists] test /usr/local/spark/python/pyspark/sql/session.py:331: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead warnings.warn("inferring schema from dict is deprecated," +---+----+-----+ | id|name|count| +---+----+-----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| +---+----+-----+ 17/08/10 16:05:07 WARN log: Updating partition stats fast for: data 17/08/10 16:05:07 WARN log: Updated size to 545 17/08/10 16:05:07 WARN log: Updating partition stats fast for: data 17/08/10 16:05:07 WARN log: Updated size to 545 17/08/10 16:05:07 WARN log: Updating partition stats fast for: data 17/08/10 16:05:07 WARN log: Updated size to 545 +---+----+-----+ | id|name|count| +---+----+-----+ | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+ +---+----+-----+ | id|name|count| +---+----+-----+ | 9| 4| null| | 10| 6| null| | 7| 1| null| | 8| 2| null| | 1| 1| 1| | 2| 2| 2| | 3| 3| 3| | 4| 4| 4| | 5| 5| 5| | 6| 6| 6| +---+----+-----+
In the last show(). I see the data is null
spark init
self.spark = SparkSession \
.builder \
.master("spark://localhost:7077") \
.enableHiveSupport() \
.getOrCreate()
Code for the test case
def test_clean_insert_table(self): table_name = "data" data0 = [ {"id": 1, "name":"1", "count": 1}, {"id": 2, "name":"2", "count": 2}, {"id": 3, "name":"3", "count": 3}, ] df_data0 = self.spark.createDataFrame(data0) df_data0.write.partitionBy("count").mode("overwrite").saveAsTable(table_name) df_return = self.spark.read.table(table_name) df_return.show() data1 = [ {"id": 4, "name":"4", "count": 4}, {"id": 5, "name":"5", "count": 5}, {"id": 6, "name":"6", "count": 6}, ] df_data1 = self.spark.createDataFrame(data1) df_data1.write.insertInto(table_name) df_return = self.spark.read.table(table_name) df_return.show() data3 = [ {"id": 1, "name":"one", "count":7}, {"id": 2, "name":"two", "count": 8}, {"id": 4, "name":"three", "count": 9}, {"id": 6, "name":"six", "count":10} ] data3 = self.spark.createDataFrame(data3) data3.write.insertInto(table_name) df_return = self.spark.read.table(table_name) df_return.show()