Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10762

GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • None
    • None
    • Spark Core
    • None

    Description

      I have a Hive table in parquet format that was generated using

      create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
      

      I am able to verify that it was filled – here is a sample value

      [1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
      

      I wish to put this into a Spark RDD of the form

      ((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
      

      Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values

      scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
      tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
      

      using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:

      scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
      tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
      
      scala> tempRDD2.collect.foreach(println)
      ((1,abcdef),((2,ghijkl),Set((1,hello))))
      

      But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:

      scala> val hc = new HiveContext(sc)
      scala> import hc._
      scala> import hc.implicits._
      
      scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
      
      scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
      
      scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
      tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
      
      scala> tempRDD3.collect.foreach(println)
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
             at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
             at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
             at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
             at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
             at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
             at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
             at scala.collection.Iterator$class.foreach(Iterator.scala:727)
             at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
             at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
             at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
             at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
             at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
             at scala.collection.AbstractIterator.to(Iterator.scala:1157)
             at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
             at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
             at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
             at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
             at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
             at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
             at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
             at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
             at org.apache.spark.scheduler.Task.run(Task.scala:64)
             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
             at java.lang.Thread.run(Thread.java:724)
      
      Driver stacktrace:
             at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
             at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
             at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
             at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
             at scala.Option.foreach(Option.scala:236)
             at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
             at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
             at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
             at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      

      Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.

      It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:

      scala> tempRDD
      org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
      
      scala> tempRDDfromHive
      org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
      

      the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.

      I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.

      Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkQL.

      Possibly related Apache Spark Jira Issues:
      https://issues.apache.org/jira/browse/SPARK-1040
      https://issues.apache.org/jira/browse/SPARK-2737
      https://issues.apache.org/jira/browse/SPARK-4489 (still open)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              glenn.strycker@gmail.com Glenn Strycker
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: