Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16664

Spark 1.6.2 - Persist call on Data frames with more than 200 columns is wiping out the data.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.6.2
    • Fix Version/s: 1.6.3, 2.0.1, 2.1.0
    • Component/s: Spark Core
    • Labels:
      None
    • Target Version/s:
    • Flags:
      Important

      Description

      Calling persist on a data frame with more than 200 columns is removing the data from the data frame. This is an issue in Spark 1.6.2. Works with out any issues in Spark 1.6.1

      Following test case demonstrates problem. Please let me know if you need any additional information. Thanks.

      import org.apache.spark._
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{Row, SQLContext}
      import org.scalatest.FunSuite
      
      class TestSpark162_1 extends FunSuite {
      
        test("test data frame with 200 columns") {
          val sparkConfig = new SparkConf()
          val parallelism = 5
          sparkConfig.set("spark.default.parallelism", s"$parallelism")
          sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
      
          val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
          val sqlContext = new SQLContext(sc)
      
          // create dataframe with 200 columns and fake 200 values
          val size = 200
          val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
          val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
      
          val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true))
          val testSchema = StructType(schemas)
          val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
      
          // test value
          assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
          sc.stop()
        }
      
        test("test data frame with 201 columns") {
          val sparkConfig = new SparkConf()
          val parallelism = 5
          sparkConfig.set("spark.default.parallelism", s"$parallelism")
          sparkConfig.set("spark.sql.shuffle.partitions", s"$parallelism")
      
          val sc = new SparkContext(s"local[3]", "TestNestedJson", sparkConfig)
          val sqlContext = new SQLContext(sc)
      
          // create dataframe with 201 columns and fake 201 values
          val size = 201
          val rdd: RDD[Seq[Long]] = sc.parallelize(Seq(Seq.range(0, size)))
          val rowRdd: RDD[Row] = rdd.map(d => Row.fromSeq(d))
      
          val schemas = List.range(0, size).map(a => StructField("name"+ a, LongType, true))
          val testSchema = StructType(schemas)
          val testDf = sqlContext.createDataFrame(rowRdd, testSchema)
      
      
          // test value
          assert(testDf.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
          sc.stop()
        }
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                darkcaller Wesley Tang
                Reporter:
                skolli Satish Kolli
              • Votes:
                1 Vote for this issue
                Watchers:
                16 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: