Details
Description
Using the "LOAD" sql command in Hive context to load parquet files into a partitioned table causes exceptions during query time.
The bug requires the table to have a column of type Array of struct and to be partitioned.
The example bellow shows how to reproduce the bug and you can see that if the table is not partitioned the query works fine.
scala> val data1 = """{"data_array":[{"field1":1,"field2":2}]}""" scala> val data2 = """{"data_array":[{"field1":3,"field2":4}]}""" scala> val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil) scala> val schemaRDD = hiveContext.jsonRDD(jsonRDD) scala> schemaRDD.printSchema root |-- data_array: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- field1: integer (nullable = true) | | |-- field2: integer (nullable = true) scala> hiveContext.sql("create external table if not exists partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) Partitioned by (date STRING) STORED AS PARQUET Location 'hdfs://****/partitioned_table'") scala> hiveContext.sql("create external table if not exists none_partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) STORED AS PARQUET Location 'hdfs://****/none_partitioned_table'") scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_1") scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_2") scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE partitioned_table PARTITION(date='2015-02-12')") scala> hiveContext.sql("LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE none_partitioned_table") scala> hiveContext.sql("select data.field1 from none_partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data").collect res23: Array[org.apache.spark.sql.Row] = Array([1], [3]) scala> hiveContext.sql("select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data").collect 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data 15/02/12 16:21:03 INFO ParseDriver: Parse Completed 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with curMem=0, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 254.6 KB, free 267.0 MB) 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with curMem=260661, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 27.9 KB, free 267.0 MB) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:51990 (size: 27.9 KB, free: 267.2 MB) 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD at ParquetTableOperations.scala:119 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split Strategy 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at SparkPlan.scala:84) with 3 output partitions (allowLocal=false) 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at SparkPlan.scala:84) 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List() 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List() 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84), which has no missing parents 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with curMem=289276, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 7.5 KB, free 267.0 MB) 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(4230) called with curMem=296908, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 4.1 KB, free 267.0 MB) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:51990 (size: 4.1 KB, free: 267.2 MB) 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_19_piece0 15/02/12 16:21:03 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:838 15/02/12 16:21:03 INFO DAGScheduler: Submitting 3 missing tasks from Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84) 15/02/12 16:21:03 INFO TaskSchedulerImpl: Adding task set 13.0 with 3 tasks 15/02/12 16:21:03 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 48, *****, NODE_LOCAL, 1640 bytes) 15/02/12 16:21:03 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID 49, *****, NODE_LOCAL, 1641 bytes) 15/02/12 16:21:03 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID 50, *****, NODE_LOCAL, 1640 bytes) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:39729 (size: 4.1 KB, free: 133.6 MB) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:48213 (size: 4.1 KB, free: 133.6 MB) 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on *****:45394 (size: 4.1 KB, free: 133.6 MB) 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:39729 (size: 27.9 KB, free: 133.6 MB) 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:48213 (size: 27.9 KB, free: 133.6 MB) 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *****:45394 (size: 27.9 KB, free: 133.6 MB) 15/02/12 16:21:04 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 48, *****): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.1 in stage 13.0 (TID 51, *****, NODE_LOCAL, 1640 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.0 in stage 13.0 (TID 49) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 1] 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.1 in stage 13.0 (TID 52, *****, NODE_LOCAL, 1641 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.1 in stage 13.0 (TID 51) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 2] 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.2 in stage 13.0 (TID 53, *****, NODE_LOCAL, 1640 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID 50) in 405 ms on ***** (1/3) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.1 in stage 13.0 (TID 52) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 3] 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.2 in stage 13.0 (TID 54, *****, NODE_LOCAL, 1641 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.2 in stage 13.0 (TID 53) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 4] 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.3 in stage 13.0 (TID 55, *****, NODE_LOCAL, 1640 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.2 in stage 13.0 (TID 54) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 5] 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.3 in stage 13.0 (TID 56, *****, NODE_LOCAL, 1641 bytes) 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.3 in stage 13.0 (TID 55) on executor *****: java.lang.ClassCastException (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 6] 15/02/12 16:21:04 ERROR TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job 15/02/12 16:21:04 INFO TaskSchedulerImpl: Cancelling stage 13 15/02/12 16:21:04 INFO TaskSchedulerImpl: Stage 13 was cancelled 15/02/12 16:21:04 INFO DAGScheduler: Job 12 failed: collect at SparkPlan.scala:84, took 0.556942 s 15/02/12 16:21:04 WARN TaskSetManager: Lost task 1.3 in stage 13.0 (TID 56, *****): TaskKilled (killed intentionally) 15/02/12 16:21:04 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 55, *****): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Attachments
Issue Links
- is duplicated by
-
SPARK-6276 beeline client class cast exception on partitioned table Spark SQL
- Resolved
- links to