Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5034

Enum info lost during schema conversion

    XMLWordPrintableJSON

Details

    Description

      When a transformer is used in deltastreamer sync, SparkAvroPostProcessor would be attached to SchemaProvider by default (see [code)

       

      And in SparkAvroPostProcessor it's converting avro schema to struct type schema and then convert it back immediately (see code

       

      But during the conversion, if the original avro schema has 'enum' field specified, the field would be lost: schema would first be converted to struct type schema and the 'enum' would be converted to 'string' type. And when it's converted back to avro type, the 'string' type would not be converted back to 'enum'.

       

      Steps to reproduce:

      1. Prepare an avro schema that contains enum field, sample below
      2. {
            "name": "accountDataRecord",
            "namespace": "sample.test",
            "type": "record",
            "fields": [
                {
                    "name": "action",
                    "type": {
                        "name": "testEnum",
                        "type" : "enum",
                        "symbols": [
                            "INSERT",
                            "UPDATE",
                            "DELETE"
                        ]
                    }
                },
            {"name":"ts","type":"int"}
            ]
        } 
      1. Run Deltastreamer with a transformer
      2. Exception: 
      3. Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
            at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
            at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
            at scala.Option.foreach(Option.scala:407)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
            at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
            at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
            at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
            at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
            at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
            at org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131)
            at org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
            at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109)
            at org.apache.hudi.common.util.Option.map(Option.java:108)
            at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109)
            at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424)
            at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
            at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
            at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
            at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
            at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
            at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
            at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, expecting string
            at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
            at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
            at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203)
            at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
            at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222)
            at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
            at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
            at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
            at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
            at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298)
            at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
            at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
            at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
            at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251)
            at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126)
            at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55)
            at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
            at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 

      Some logs I added to expose the issues:

      // original schema
      22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, wrapSchemaProviderWithPostProcessor, schema provider by the end: {
        "type" : "record",
        "name" : "accountDataRecord",
        "namespace" : "sample.test",
        "fields" : [ {
          "name" : "action",
          "type" : {
            "type" : "enum",
            "name" : "testEnum",
            "symbols" : [ "INSERT", "UPDATE", "DELETE" ]
          }
        }, {
          "name" : "ts",
          "type" : "int"
        } ]
      }
      
      /* Around this LOC (https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673) The schema converted back lost enum field already */
      22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source schema from schema provider: {
        "type" : "record",
        "name" : "hoodie_source",
        "namespace" : "hoodie.source",
        "fields" : [ {
          "name" : "action",
          "type" : "string"
        }, {
          "name" : "ts",
          "type" : "int"
        } ]
      } 

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yc2523 Shawn Chang
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: