Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13709

Spark unable to decode Avro when partitioned

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 2.0.0
    • Component/s: SQL
    • Labels:
      None
    • Flags:
      Important

      Description

      There is a problem decoding Avro data with SparkSQL when partitioned. The schema and encoded data are valid – I'm able to decode the data with the avro-tools CLI utility. I'm also able to decode the data with non-partitioned SparkSQL tables, Hive, other tools as well... except partitioned SparkSQL schemas.

      For a simple example, I took the example schema and data found in the Oracle documentation here:

      Schema

      {
        "type": "record",
        "name": "MemberInfo",
        "namespace": "avro",
        "fields": [
          {"name": "name", "type": {
            "type": "record",
            "name": "FullName",
            "fields": [
              {"name": "first", "type": "string"},
              {"name": "last", "type": "string"}
            ]
          }},
          {"name": "age", "type": "int"},
          {"name": "address", "type": {
            "type": "record",
            "name": "Address",
            "fields": [
              {"name": "street", "type": "string"},
              {"name": "city", "type": "string"},
              {"name": "state", "type": "string"},
              {"name": "zip", "type": "int"}
            ]
          }}
        ]
      } 
      

      Data

      {
        "name": {
          "first": "Percival",
          "last":  "Lowell"
        },
        "age": 156,
        "address": {
          "street": "Mars Hill Rd",
          "city": "Flagstaff",
          "state": "AZ",
          "zip": 86001
        }
      }
      

      Create (no partitions - works)
      If I create with no partitions, I'm able to query the data just fine.

      CREATE EXTERNAL TABLE IF NOT EXISTS foo
      ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
      STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
      OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
      LOCATION '/path/to/data/dir'
      TBLPROPERTIES ('avro.schema.url'='/path/to/schema.avsc');
      

      Create (partitions – does NOT work)
      If I create with no partitions, and then manually add a partition, all of my queries return an error. (I need to manually add partitions because I cannot control the structure of the data directories, so dynamic partitioning is not an option.)

      CREATE EXTERNAL TABLE IF NOT EXISTS foo
      PARTITIONED BY (ds STRING)
      ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
      STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
      OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
      TBLPROPERTIES ('avro.schema.url'='/path/to/schema.avsc');
      
      ALTER TABLE foo ADD PARTITION (ds='1') LOCATION '/path/to/data/dir';
      

      The error:

      spark-sql> SELECT * FROM foo WHERE ds = '1';
      
      Driver stacktrace:
          at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
          at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
          at scala.Option.foreach(Option.scala:236)
          at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
          at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
          at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
          at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
          at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
          at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
          at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
          at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:635)
          at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
          at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:308)
          at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
          at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
          at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:483)
          at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
          at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
          at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
          at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
          at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      Caused by: org.apache.avro.AvroTypeException: Found avro.FullName, expecting union
          at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
          at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
          at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
          at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
          at org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
          at org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
          at org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
          at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
          at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
          at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
          at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
          at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
          at scala.collection.AbstractIterator.to(Iterator.scala:1157)
          at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
          at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
          at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
          at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
          at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
          at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
          at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
          at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
          at org.apache.spark.scheduler.Task.run(Task.scala:89)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      

      Additional Info
      For what it's worth, I found an issue (DRILL-957) reported in Apache Drill and related fix that look very simliar to this. I'll look that to this issue.

      Originally posted here on StackOverflow as a question, but I felt strongly that this is indeed a bug so I created this issue.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                lian cheng Cheng Lian
                Reporter:
                cmiller11101 Chris Miller
              • Votes:
                3 Vote for this issue
                Watchers:
                14 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: