Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.6.2
-
None
-
Important
Description
Calling persist on a data frame with more than 200 columns is removing the data from the data frame. This is an issue in Spark 1.6.2. Works with out any issues in Spark 1.6.1
Following test case demonstrates problem. Please let me know if you need any additional information. Thanks.
import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} import org.scalatest.FunSuite class TestSpark162_1 extends FunSuite { test("test data frame with 200 columns") { val sparkConfig = new SparkConf() val parallelism = 5 sparkConfig.set("spark.default.parallelism", s"$parallelism") sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism") val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig) val sqlContext = new SQLContext(sc) // create dataframe with 200 columns and fake 200 values val size = 200 val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true)) val testSchema = StructType(schemas) val testDf = sqlContext.createDataFrame(rowRdd, testSchema) // test value assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) sc.stop() } test("test data frame with 201 columns") { val sparkConfig = new SparkConf() val parallelism = 5 sparkConfig.set("spark.default.parallelism", s"$parallelism") sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism") val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig) val sqlContext = new SQLContext(sc) // create dataframe with 201 columns and fake 201 values val size = 201 val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size))) val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d)) val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true)) val testSchema = StructType(schemas) val testDf = sqlContext.createDataFrame(rowRdd, testSchema) // test value assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) sc.stop() } }
Attachments
Issue Links
- is duplicated by
-
SPARK-17061 Incorrect results returned following a join of two datasets and a map step where total number of columns >100
- Resolved
-
SPARK-17043 Cannot call zipWithIndex on RDD with more than 200 columns (get wrong result)
- Resolved
-
SPARK-17218 Caching a DataFrame with >200 columns ~nulls the contents
- Resolved
-
SPARK-17294 Caching invalidates data on mildly wide dataframes
- Resolved
-
SPARK-21851 Spark 2.0 data corruption with cache and 200 columns
- Resolved
-
SPARK-16716 calling cache on joined dataframe can lead to data being blanked
- Closed
- links to