Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5775

GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table

    XMLWordPrintableJSON

    Details

    • Target Version/s:

      Description

      Using the "LOAD" sql command in Hive context to load parquet files into a partitioned table causes exceptions during query time.
      The bug requires the table to have a column of type Array of struct and to be partitioned.

      The example bellow shows how to reproduce the bug and you can see that if the table is not partitioned the query works fine.

      scala> val data1 = """{"data_array":[{"field1":1,"field2":2}]}"""
      scala> val data2 = """{"data_array":[{"field1":3,"field2":4}]}"""
      scala> val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil)
      scala> val schemaRDD = hiveContext.jsonRDD(jsonRDD)
      scala> schemaRDD.printSchema
      root
       |-- data_array: array (nullable = true)
       |    |-- element: struct (containsNull = false)
       |    |    |-- field1: integer (nullable = true)
       |    |    |-- field2: integer (nullable = true)
      
      scala> hiveContext.sql("create external table if not exists partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) Partitioned by (date STRING) STORED AS PARQUET Location 'hdfs://****/partitioned_table'")
      scala> hiveContext.sql("create external table if not exists none_partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) STORED AS PARQUET Location 'hdfs://****/none_partitioned_table'")
      
      scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_1")
      scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_2")
      
      scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE partitioned_table PARTITION(date='2015-02-12')")
      scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE none_partitioned_table")
      
      scala> hiveContext.sql("select data.field1 from none_partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data").collect
      res23: Array[org.apache.spark.sql.Row] = Array([1], [3])
      
      scala> hiveContext.sql("select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data").collect
      
      15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data
      15/02/12 16:21:03 INFO ParseDriver: Parse Completed
      15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with curMem=0, maxMem=280248975
      15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 254.6 KB, free 267.0 MB)
      15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with curMem=260661, maxMem=280248975
      15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 27.9 KB, free 267.0 MB)
      15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:51990 (size: 27.9 KB, free: 267.2 MB)
      15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0
      15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD at ParquetTableOperations.scala:119
      15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3
      15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3
      15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split Strategy
      15/02/12 16:21:03 INFO SparkContext: Starting job: collect at SparkPlan.scala:84
      15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at SparkPlan.scala:84) with 3 output partitions (allowLocal=false)
      15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at SparkPlan.scala:84)
      15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List()
      15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List()
      15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84), which has no missing parents
      15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with curMem=289276, maxMem=280248975
      15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 7.5 KB, free 267.0 MB)
      15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(4230) called with curMem=296908, maxMem=280248975
      15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 4.1 KB, free 267.0 MB)
      15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:51990 (size: 4.1 KB, free: 267.2 MB)
      15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_19_piece0
      15/02/12 16:21:03 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:838
      15/02/12 16:21:03 INFO DAGScheduler: Submitting 3 missing tasks from Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84)
      15/02/12 16:21:03 INFO TaskSchedulerImpl: Adding task set 13.0 with 3 tasks
      15/02/12 16:21:03 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 48, *****, NODE_LOCAL, 1640 bytes)
      15/02/12 16:21:03 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID 49, *****, NODE_LOCAL, 1641 bytes)
      15/02/12 16:21:03 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID 50, *****, NODE_LOCAL, 1640 bytes)
      15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:39729 (size: 4.1 KB, free: 133.6 MB)
      15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:48213 (size: 4.1 KB, free: 133.6 MB)
      15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:45394 (size: 4.1 KB, free: 133.6 MB)
      15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:39729 (size: 27.9 KB, free: 133.6 MB)
      15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:48213 (size: 27.9 KB, free: 133.6 MB)
      15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:45394 (size: 27.9 KB, free: 133.6 MB)
      15/02/12 16:21:04 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 48, *****): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
        at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
        at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
      
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.1 in stage 13.0 (TID 51, *****, NODE_LOCAL, 1640 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.0 in stage 13.0 (TID 49) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 1]
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.1 in stage 13.0 (TID 52, *****, NODE_LOCAL, 1641 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.1 in stage 13.0 (TID 51) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 2]
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.2 in stage 13.0 (TID 53, *****, NODE_LOCAL, 1640 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID 50) in 405 ms on ***** (1/3)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.1 in stage 13.0 (TID 52) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 3]
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.2 in stage 13.0 (TID 54, *****, NODE_LOCAL, 1641 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.2 in stage 13.0 (TID 53) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 4]
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.3 in stage 13.0 (TID 55, *****, NODE_LOCAL, 1640 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.2 in stage 13.0 (TID 54) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 5]
      15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.3 in stage 13.0 (TID 56, *****, NODE_LOCAL, 1641 bytes)
      15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.3 in stage 13.0 (TID 55) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 6]
      15/02/12 16:21:04 ERROR TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job
      15/02/12 16:21:04 INFO TaskSchedulerImpl: Cancelling stage 13
      15/02/12 16:21:04 INFO TaskSchedulerImpl: Stage 13 was cancelled
      15/02/12 16:21:04 INFO DAGScheduler: Job 12 failed: collect at SparkPlan.scala:84, took 0.556942 s
      15/02/12 16:21:04 WARN TaskSetManager: Lost task 1.3 in stage 13.0 (TID 56, *****): TaskKilled (killed intentionally)
      15/02/12 16:21:04 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool 
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 55, *****): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
        at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
        at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
      
      Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                lian cheng Cheng Lian
                Reporter:
                Ayoub Ayoub Benali
                Shepherd:
                Cheng Lian
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: