Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-577

mandatory status of avro columns ignored

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0
    • None
    • parquet-avro
    • None

    Description

      Avro spec schema [resolution rules ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say:
      "if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled."

      I can't find the implementation of this aspect in parquet.avro and indeed observe this rule seemingly ignored. I am using 1.6.0 because that's what we can get off maven.

      My writer's schema:

      {
        "type" : "record",
        "name" : "SampleSchema_v1",
        "namespace" : "com.xxxx.spark",
        "fields" : [ {
          "name" : "stringField",
          "type" : "string",
          "doc"  : "Sample string field"
        },{
          "name" : "longField",
          "type" : "long",
          "doc"  : "Sample long field"
        } ],
        "doc:" : "A sample/test schema"
      }
      

      My reader schema:

      {
        "type" : "record",
        "name" : "SampleSchema_newDefaultlessCol",
        "namespace" : "com.xxxx.spark",
        "fields" : [ {
          "name" : "stringField",
          "type" : "string",
          "doc"  : "Sample string field"
        },{
          "name" : "longField",
          "type" : "long",
          "doc"  : "Sample long field"
        },{
          "name" : "mandatoryIntField",
          "type" : "int",
          "doc"  : "Sample mandatory! int field"
        }],
        "doc:" : "v1 + one extra column that has no default"
      }
      

      This is my test case:

          "accept new column w/o a default [schema-evolution, undesired]" in new MockAvroParquetGrid {
            //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new column with no default value should
            //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
            //constitute an incompatible schema break, instead, this thing uses 0 for the default
            val inputSampleRecordsV1  = Seq(new SampleSchema_v1(s"string", 1))
            dao.writeParquet[SampleSchema_v1](
              SparkBase.sc.parallelize(inputSampleRecordsV1),
              SampleSchema_v1.SCHEMA$,
              parquetFolder
            )
      
            dao
              .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, SampleSchema_newDefaultlessCol.SCHEMA$)
              .collect().toSeq.head
              .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
          }
      

      This is the implementation of writeParquet and readParquet

        def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: String)
                           (implicit ctag: ClassTag[C]): Unit = {
          val hadoopJob = Job.getInstance()
          ParquetOutputFormat.setWriteSupportClass(hadoopJob, classOf[AvroWriteSupport])
          ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
          AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)
      
          new PairRDDFunctions[Void,C](
            source.map(sourceRecord => (null, sourceRecord))
          ).saveAsNewAPIHadoopFile(
            bucketDAO.uri(dstPath),
            classOf[Void],                            //K
            ctag.runtimeClass.asInstanceOf[Class[C]], //V
            classOf[AvroParquetOutputFormat],
            hadoopJob.getConfiguration
          )
        }
      
        def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = {
          val hadoopJob = Job.getInstance()
          ParquetInputFormat.setReadSupportClass(hadoopJob, classOf[AvroReadSupport[C]])
          AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
          sc.newAPIHadoopFile(
            bucketDAO.uri(srcPath),
            classOf[ParquetInputFormat[C]],
            classOf[Void],                            //K
            ctag.runtimeClass.asInstanceOf[Class[C]], //V
            hadoopJob.getConfiguration
          ).map { _._2 }
        }
      

      We use avro-tools to generate java classes from our avro schemas.
      java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>

      The test case harvests zeroes as values of mandatoryIntField

      Naively, I see a problem in the [indexed revordĀ converter](https://git-wip-us.apache.org/repos/asf?p=parquet-mr.git;a=blob;f=parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java;h=06c66d692571da08298ae1da4f9967446c4864ee;hb=HEAD#l105) in that it cheerfully accepts a condition doomed to fail. The condition being: the reader schema has a column with no default value that is absent in the writer schema.

      I am writing predominantly to confirm my diagnosis and to get the intell on why is it implemented the way it is. Is it fixable (or other depend on it as on a feature)? Can people think of a workaround?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            remek.zajac@gmail.com Remek Zajac

            Dates

              Created:
              Updated:

              Slack

                Issue deployment