When using the `FilebasedSchemaProvider` to provide the source schema in Avro, while ingesting data from `ParquetDFSSource` with the same schema, the DeltaStreamer failed. A new test case is added below to demonstrate the error:
Based on further investigation, the root cause is that when writing parquet files in Spark, all fields are automatically converted to be nullable for compatibility reasons. If the source Avro schema has non-null fields, `AvroConversionUtils.createRdd` still uses the `dataType` from the Dataframe to convert the Row to Avro record. The `dataType` has nullable fields based on Spark logic, even though the field names are identical as the source Avro schema. Thus the resulting Avro records from the conversion have different schema (only nullability difference) compared to the source schema file. Before inserting the records, there are other operations using the source schema file, causing failure of serialization/deserialization because of this schema mismatch.
The following screenshot shows the modified Avro schema in `AvroConversionUtils.createRdd`. The original source schema file is:
Note that for some Avro schema, the DeltaStreamer sync may succeed but generate corrupt data. This behavior of generating corrupt data is originally reported by liujinhui.