Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
When a transformer is used in deltastreamer sync, SparkAvroPostProcessor would be attached to SchemaProvider by default (see [code)
And in SparkAvroPostProcessor it's converting avro schema to struct type schema and then convert it back immediately (see code)
But during the conversion, if the original avro schema has 'enum' field specified, the field would be lost: schema would first be converted to struct type schema and the 'enum' would be converted to 'string' type. And when it's converted back to avro type, the 'string' type would not be converted back to 'enum'.
Steps to reproduce:
- Prepare an avro schema that contains enum field, sample below
{ "name": "accountDataRecord", "namespace": "sample.test", "type": "record", "fields": [ { "name": "action", "type": { "name": "testEnum", "type" : "enum", "symbols": [ "INSERT", "UPDATE", "DELETE" ] } }, {"name":"ts","type":"int"} ] }
- Run Deltastreamer with a transformer
- Exception:
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255) at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.take(RDD.scala:1422) at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131) at org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109) at org.apache.hudi.common.util.Option.map(Option.java:108) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, expecting string at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251) at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126) at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
Some logs I added to expose the issues:
// original schema 22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, wrapSchemaProviderWithPostProcessor, schema provider by the end: { "type" : "record", "name" : "accountDataRecord", "namespace" : "sample.test", "fields" : [ { "name" : "action", "type" : { "type" : "enum", "name" : "testEnum", "symbols" : [ "INSERT", "UPDATE", "DELETE" ] } }, { "name" : "ts", "type" : "int" } ] } /* Around this LOC (https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673) The schema converted back lost enum field already */ 22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source schema from schema provider: { "type" : "record", "name" : "hoodie_source", "namespace" : "hoodie.source", "fields" : [ { "name" : "action", "type" : "string" }, { "name" : "ts", "type" : "int" } ] }
Attachments
Issue Links
- links to