Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4588

Ingestion failing if source column is dropped

    XMLWordPrintableJSON

Details

    • 6

    Description

      Ingestion using Deltastreamer fails if columns are dropped from source. I had reproduced using docker-demo setup. Below are the steps for reproducing it.

      1. I had created data file `stage_1.json`(attached), ingested it to kafka and ingested to hudi-table from kafka using Deltastreamer job(using FileschemaProvider with `schema_stage1.avsc`)
      2. Simulating column dropping from source in the next step.
      3.  Repeat steps in step1 with stage2 files. Stage2 files doesn't have `day` column, Ingestion job failed. Below is detailed stacktrace.
        Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
            at scala.Option.foreach(Option.scala:257)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
            at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
            at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
            at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
            at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
            at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
            at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
            at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165)
            at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:607)
            at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201)
            at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199)
            at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
            at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
            at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
            at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
            at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
            at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
            at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
            at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
            at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
            at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
            at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
            at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
            at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
            at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
            at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:123)
            at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
            at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
            at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
            at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
            at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
            ... 30 more
        Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
            at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
            at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
            ... 33 more
        Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:192)
            at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
            ... 34 more
        Caused by: org.apache.hudi.exception.HoodieException: operation has failed
            at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
            at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
            at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
            at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
            at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
            at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            ... 3 more
        Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
            at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
            at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
            at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            ... 4 more
        Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'day' not found
            at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
            at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
            at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
            at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
            at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
            at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
            at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
            at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
            at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
            ... 8 more 

         

       

      Attachments

        1. schema_stage1.avsc
          0.3 kB
          Vamshi Gudavarthi
        2. schema_stage2.avsc
          0.3 kB
          Vamshi Gudavarthi
        3. stage_1.json
          0.2 kB
          Vamshi Gudavarthi
        4. stage_2.json
          0.2 kB
          Vamshi Gudavarthi

        Issue Links

          Activity

            People

              alexey.kudinkin Alexey Kudinkin
              vgudavarthi Vamshi Gudavarthi
              Ethan Guo, sivabalan narayanan, Tao Meng
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: