Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.0.0
-
None
-
None
Description
Doing a join and cache can corrupt data as shown here:
import pyspark.sql.functions as F num_rows=200 for num_cols in range(198, 205): # create data frame with id and some dummy cols df1=spark.range(num_rows, numPartitions=100) for i in range(num_cols-1): df1=df1.withColumn("a"+str(i), F.lit("a")) # create data frame with id to join df2=spark.range(num_rows, numPartitions=100) # write and read to start "fresh" df1.write.parquet("delme_1.parquet", mode="overwrite") df2.write.parquet("delme_2.parquet", mode="overwrite") df1=spark.read.parquet("delme_1.parquet"); df2=spark.read.parquet("delme_2.parquet"); df3=df1.join(df2, "id", how="left").cache() # this cache seems to make a difference df4=df3.filter("id<10") print(len(df4.columns), df4.count(), df4.cache().count()) # second cache gives different result
Output:
198 10 10 199 10 10 200 10 10 201 12 12 202 12 12 203 16 16 204 10 12
Occasionally the middle number is also 10 (expected result) more often. Last column may show different values, but 12 and 16 are common. Sometimes you can try slightly higher num_rows to get this behaviour.
Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node YARN cluster.
I am happy to provide more information, if you let me know what is interesting.
It's not strictly `cache` which is the problem, since `toPandas` and `collect` fall for the same behavior and I basically can hardly get the data.
Attachments
Issue Links
- duplicates
-
SPARK-16664 Spark 1.6.2 - Persist call on Data frames with more than 200 columns is wiping out the data.
- Resolved