Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25588

SchemaParseException: Can't redefine: list when reading from Parquet

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.3.2, 2.4.0
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:
      None
    • Environment:

      Spark version 2.3.2

      Description

      In ADAM, a library downstream of Spark, we use Avro to define a schema, generate Java classes from the Avro schema using the avro-maven-plugin, and generate Scala Products from the Avro schema using our own code generation library.

      In the code path demonstrated by the following unit test, we write out to Parquet and read back in using an RDD of Avro-generated Java classes and then write out to Parquet and read back in using a Dataset of Avro-generated Scala Products.

        sparkTest("transform reads to variant rdd") {
          val reads = sc.loadAlignments(testFile("small.sam"))
      
          def checkSave(variants: VariantRDD) {
            val tempPath = tmpLocation(".adam")
            variants.saveAsParquet(tempPath)
      
            assert(sc.loadVariants(tempPath).rdd.count === 20)
          }
      
          val variants: VariantRDD = reads.transmute[Variant, VariantProduct, VariantRDD](
            (rdd: RDD[AlignmentRecord]) => {
              rdd.map(AlignmentRecordRDDSuite.varFn)
            })
      
          checkSave(variants)
      
          val sqlContext = SQLContext.getOrCreate(sc)
          import sqlContext.implicits._
      
          val variantsDs: VariantRDD = reads.transmuteDataset[Variant, VariantProduct, VariantRDD](
            (ds: Dataset[AlignmentRecordProduct]) => {
              ds.map(r => {
                VariantProduct.fromAvro(
                  AlignmentRecordRDDSuite.varFn(r.toAvro))
              })
            })
      
          checkSave(variantsDs)
      }
      

      https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala#L1540

      Note the schema in Parquet are different:

      RDD code path

      $ parquet-tools schema /var/folders/m6/4yqn_4q129lbth_dq3qzj_8h0000gn/T/TempSuite3400691035694870641.adam/part-r-00000.gz.parquet
      message org.bdgenomics.formats.avro.Variant {
        optional binary contigName (UTF8);
        optional int64 start;
        optional int64 end;
        required group names (LIST) {
          repeated binary array (UTF8);
        }
        optional boolean splitFromMultiAllelic;
        optional binary referenceAllele (UTF8);
        optional binary alternateAllele (UTF8);
        optional double quality;
        optional boolean filtersApplied;
        optional boolean filtersPassed;
        required group filtersFailed (LIST) {
          repeated binary array (UTF8);
        }
        optional group annotation {
          optional binary ancestralAllele (UTF8);
          optional int32 alleleCount;
          optional int32 readDepth;
          optional int32 forwardReadDepth;
          optional int32 reverseReadDepth;
          optional int32 referenceReadDepth;
          optional int32 referenceForwardReadDepth;
          optional int32 referenceReverseReadDepth;
          optional float alleleFrequency;
          optional binary cigar (UTF8);
          optional boolean dbSnp;
          optional boolean hapMap2;
          optional boolean hapMap3;
          optional boolean validated;
          optional boolean thousandGenomes;
          optional boolean somatic;
          required group transcriptEffects (LIST) {
            repeated group array {
              optional binary alternateAllele (UTF8);
              required group effects (LIST) {
                repeated binary array (UTF8);
              }
              optional binary geneName (UTF8);
              optional binary geneId (UTF8);
              optional binary featureType (UTF8);
              optional binary featureId (UTF8);
              optional binary biotype (UTF8);
              optional int32 rank;
              optional int32 total;
              optional binary genomicHgvs (UTF8);
              optional binary transcriptHgvs (UTF8);
              optional binary proteinHgvs (UTF8);
              optional int32 cdnaPosition;
              optional int32 cdnaLength;
              optional int32 cdsPosition;
              optional int32 cdsLength;
              optional int32 proteinPosition;
              optional int32 proteinLength;
              optional int32 distance;
              required group messages (LIST) {
                repeated binary array (ENUM);
              }
            }
          }
          required group attributes (MAP) {
            repeated group map (MAP_KEY_VALUE) {
              required binary key (UTF8);
              required binary value (UTF8);
            }
          }
        }
      }
      

      Dataset code path:

      $ parquet-tools schema /var/folders/m6/4yqn_4q129lbth_dq3qzj_8h0000gn/T/TempSuite2879366708769671307.adam/part-00000-b123eb8b-2648-4648-8096-b3de95343141-c000.snappy.parquet
      message spark_schema {
        optional binary contigName (UTF8);
        optional int64 start;
        optional int64 end;
        optional group names (LIST) {
          repeated group list {
            optional binary element (UTF8);
          }
        }
        optional boolean splitFromMultiAllelic;
        optional binary referenceAllele (UTF8);
        optional binary alternateAllele (UTF8);
        optional double quality;
        optional boolean filtersApplied;
        optional boolean filtersPassed;
        optional group filtersFailed (LIST) {
          repeated group list {
            optional binary element (UTF8);
          }
        }
        optional group annotation {
          optional binary ancestralAllele (UTF8);
          optional int32 alleleCount;
          optional int32 readDepth;
          optional int32 forwardReadDepth;
          optional int32 reverseReadDepth;
          optional int32 referenceReadDepth;
          optional int32 referenceForwardReadDepth;
          optional int32 referenceReverseReadDepth;
          optional float alleleFrequency;
          optional binary cigar (UTF8);
          optional boolean dbSnp;
          optional boolean hapMap2;
          optional boolean hapMap3;
          optional boolean validated;
          optional boolean thousandGenomes;
          optional boolean somatic;
          optional group transcriptEffects (LIST) {
            repeated group list {
              optional group element {
                optional binary alternateAllele (UTF8);
                optional group effects (LIST) {
                  repeated group list {
                    optional binary element (UTF8);
                  }
                }
                optional binary geneName (UTF8);
                optional binary geneId (UTF8);
                optional binary featureType (UTF8);
                optional binary featureId (UTF8);
                optional binary biotype (UTF8);
                optional int32 rank;
                optional int32 total;
                optional binary genomicHgvs (UTF8);
                optional binary transcriptHgvs (UTF8);
                optional binary proteinHgvs (UTF8);
                optional int32 cdnaPosition;
                optional int32 cdnaLength;
                optional int32 cdsPosition;
                optional int32 cdsLength;
                optional int32 proteinPosition;
                optional int32 proteinLength;
                optional int32 distance;
                optional group messages (LIST) {
                  repeated group list {
                    optional binary element (UTF8);
                  }
                }
              }
            }
          }
          optional group attributes (MAP) {
            repeated group key_value {
              required binary key (UTF8);
              optional binary value (UTF8);
            }
          }
        }
      }
      

      With Spark 2.4.0 (RC2), and Parquet dependency version 1.10.0, the Dataset path now fails

      $ mvn test
      ...
      - transform reads to variant rdd *** FAILED ***
        org.apache.spark.SparkException: Job aborted due to stage failure:
      Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver):
       org.apache.avro.SchemaParseException: Can't redefine: list
      	at org.apache.avro.Schema$Names.put(Schema.java:1128)
      	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
      	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
      	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
      	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
      	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
      	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
      	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
      	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
      	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
      	at org.apache.avro.Schema.toString(Schema.java:324)
      	at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
      	at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
      	at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
      	at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
      	at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144)
      	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136)
      	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
      	at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
      	at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
      	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:199)
      	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:196)
      	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:151)
      	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      	at org.apache.spark.scheduler.Task.run(Task.scala:121)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      
      2018-09-29 21:39:47 ERROR TaskSetManager:70 - Task 0 in stage 3.0 failed 1 times; aborting job
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1866)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1866)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2038)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
        at org.bdgenomics.adam.rdd.read.AlignmentRecordRDDSuite$$anonfun$78.checkSave$6(AlignmentRecordRDDSuite.scala:1551)
        at org.bdgenomics.adam.rdd.read.AlignmentRecordRDDSuite$$anonfun$78.apply$mcV$sp(AlignmentRecordRDDSuite.scala:1579)
        at org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply$mcV$sp(SparkFunSuite.scala:102)
        at org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply(SparkFunSuite.scala:98)
        at org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply(SparkFunSuite.scala:98)
        at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
        at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
        at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
        at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
        at org.bdgenomics.adam.util.ADAMFunSuite.org$scalatest$BeforeAndAfter$$super$runTest(ADAMFunSuite.scala:24)
        at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
        at org.bdgenomics.adam.util.ADAMFunSuite.runTest(ADAMFunSuite.scala:24)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
        at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
        at org.scalatest.Suite$class.run(Suite.scala:1424)
        at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
        at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
        at org.bdgenomics.adam.util.ADAMFunSuite.org$scalatest$BeforeAndAfter$$super$run(ADAMFunSuite.scala:24)
        at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
        at org.bdgenomics.adam.util.ADAMFunSuite.run(ADAMFunSuite.scala:24)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
        at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
        at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
        at org.scalatest.tools.Runner$.main(Runner.scala:860)
        at org.scalatest.tools.Runner.main(Runner.scala)
        Cause: org.apache.avro.SchemaParseException: Can't redefine: list
        at org.apache.avro.Schema$Names.put(Schema.java:1128)
        at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
        at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema.toString(Schema.java:324)
        at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
        at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
        at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333)
        at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
        at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
        at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168)
        at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
        at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
        at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
        at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144)
        at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
        at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
        at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:199)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:196)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:151)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      

      Regression from Spark version 2.3.1.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              heuermh Michael Heuer
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated: