Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.6.0
-
None
-
None
Description
Avro spec schema [resolution rules ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say:
"if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled."
I can't find the implementation of this aspect in parquet.avro and indeed observe this rule seemingly ignored. I am using 1.6.0 because that's what we can get off maven.
My writer's schema:
{ "type" : "record", "name" : "SampleSchema_v1", "namespace" : "com.xxxx.spark", "fields" : [ { "name" : "stringField", "type" : "string", "doc" : "Sample string field" },{ "name" : "longField", "type" : "long", "doc" : "Sample long field" } ], "doc:" : "A sample/test schema" }
My reader schema:
{ "type" : "record", "name" : "SampleSchema_newDefaultlessCol", "namespace" : "com.xxxx.spark", "fields" : [ { "name" : "stringField", "type" : "string", "doc" : "Sample string field" },{ "name" : "longField", "type" : "long", "doc" : "Sample long field" },{ "name" : "mandatoryIntField", "type" : "int", "doc" : "Sample mandatory! int field" }], "doc:" : "v1 + one extra column that has no default" }
This is my test case:
"accept new column w/o a default [schema-evolution, undesired]" in new MockAvroParquetGrid { //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new column with no default value should //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610 //constitute an incompatible schema break, instead, this thing uses 0 for the default val inputSampleRecordsV1 = Seq(new SampleSchema_v1(s"string", 1)) dao.writeParquet[SampleSchema_v1]( SparkBase.sc.parallelize(inputSampleRecordsV1), SampleSchema_v1.SCHEMA$, parquetFolder ) dao .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, SampleSchema_newDefaultlessCol.SCHEMA$) .collect().toSeq.head .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess }
This is the implementation of writeParquet and readParquet
def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: String) (implicit ctag: ClassTag[C]): Unit = { val hadoopJob = Job.getInstance() ParquetOutputFormat.setWriteSupportClass(hadoopJob, classOf[AvroWriteSupport]) ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP) AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema) new PairRDDFunctions[Void,C]( source.map(sourceRecord => (null, sourceRecord)) ).saveAsNewAPIHadoopFile( bucketDAO.uri(dstPath), classOf[Void], //K ctag.runtimeClass.asInstanceOf[Class[C]], //V classOf[AvroParquetOutputFormat], hadoopJob.getConfiguration ) } def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = { val hadoopJob = Job.getInstance() ParquetInputFormat.setReadSupportClass(hadoopJob, classOf[AvroReadSupport[C]]) AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema) sc.newAPIHadoopFile( bucketDAO.uri(srcPath), classOf[ParquetInputFormat[C]], classOf[Void], //K ctag.runtimeClass.asInstanceOf[Class[C]], //V hadoopJob.getConfiguration ).map { _._2 } }
We use avro-tools to generate java classes from our avro schemas.
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>
The test case harvests zeroes as values of mandatoryIntField
Naively, I see a problem in the [indexed revordĀ converter](https://git-wip-us.apache.org/repos/asf?p=parquet-mr.git;a=blob;f=parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java;h=06c66d692571da08298ae1da4f9967446c4864ee;hb=HEAD#l105) in that it cheerfully accepts a condition doomed to fail. The condition being: the reader schema has a column with no default value that is absent in the writer schema.
I am writing predominantly to confirm my diagnosis and to get the intell on why is it implemented the way it is. Is it fixable (or other depend on it as on a feature)? Can people think of a workaround?