Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1343

Add standard schema postprocessor which would rewrite the schema using spark-avro conversion

    XMLWordPrintableJSON

Details

    Description

      When we use Transformer, the final Schema which we use to convert avro record to bytes is auto generated by spark. This could be different (due to the way Avro treats it) from the target schema that is being used to write (as the target schema could be coming from Schema Registry). 

       

      For example : 

      Schema generated by spark-avro when converting Row to avro

      {

        "type" : "record",

        "name" : "hoodie_source",

        "namespace" : "hoodie.source",

        "fields" : [

      {     "name" : "_ts_ms",     "type" : [ "long", "null" ]   }

      ,

      {     "name" : "_op",     "type" : "string"   }

      ,

      {     "name" : "inc_id",     "type" : "int"   }

      ,

      {     "name" : "year",     "type" : [ "int", "null" ]   }

      ,

      {     "name" : "violation_desc",     "type" : [ "string", "null" ]   }

      ,

      {     "name" : "violation_code",     "type" : [ "string", "null" ]   }

      ,

      {     "name" : "case_individual_id",     "type" : [ "int", "null" ]   }

      ,

      {     "name" : "flag",     "type" : [ "string", "null" ]   }

      ,

      {     "name" : "last_modified_ts",     "type" : "long"   }

      ]

      }

       

      is not compatible with the Avro Schema:

       

      {

        "type" : "record",

        "name" : "formatted_debezium_payload",

        "fields" : [

      {     "name" : "_ts_ms",     "type" : [ "null", "long" ],     "default" : null   }

      ,

      {     "name" : "_op",     "type" : "string",     "default" : null   }

      ,

      {     "name" : "inc_id",     "type" : "int",     "default" : null   }

      ,

      {     "name" : "year",     "type" : [ "null", "int" ],     "default" : null   }

      ,

      {     "name" : "violation_desc",     "type" : [ "null", "string" ],     "default" : null   }

      ,

      {     "name" : "violation_code",     "type" : [ "null", "string" ],     "default" : null   }

      ,

      {     "name" : "case_individual_id",     "type" : [ "null", "int" ],     "default" : null   }

      ,

      {     "name" : "flag",     "type" : [ "null", "string" ],     "default" : null   }

      ,

      {     "name" : "last_modified_ts",     "type" : "long",     "default" : null   }

      ]

      }

       

      Note that the type order is different for individual fields : 

      "type" : [ "null", "string" ], vs  "type" : [ "string", "null" ]

      Unexpectedly, Avro decoding fails when bytes written with first schema is read using second schema.

       

      One way to fix is to use configured target schema when generating record bytes but this is not easy without breaking Record payload constructor API used by deltastreamer. 

      The other option is to apply a post-processor on target schema to make it schema consistent with Transformer generated records.

       

      This ticket is to use the later approach of creating a standard schema post-processor and adding it by default when Transformer is used.

      Attachments

        Issue Links

          Activity

            People

              liujinhui liujinhui
              vbalaji Balaji Varadarajan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: