Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1330

handle prefix filtering at directory level

    XMLWordPrintableJSON

    Details

      Description

      The current DFSPathSelector only ignore prefix(_, .) at the file level while files under intermediate directories are still being considered
      E.g. when reading from a Spark structure streaming source which very often consists of a .checkpoint directory, all metadata files should be ignored. This is not the case currently. E.g.

      /path/.file <-- skipped
      /path/.path/file <-- still being read
       
      $SPARK_HOME/bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ~/.m2/repository/org/apache/hudi/hudi-utilities-bundle_2.12/0.6.1-SNAPSHOT/hudi-utilities-bundle_2.12-0.6.1-SNAPSHOT.jar --target-base-path 'file:///tmp/hoodie/output/cow' --target-table hoodie_cow --table-type COPY_ON_WRITE --props 'dfs-source.properties' --source-class org.apache.hudi.utilities.sources.ParquetDFSSource  --source-ordering-field ts --op UPSERT --continuous --min-sync-interval-seconds 30 

       configs:

      hoodie.upsert.shuffle.parallelism=2
      hoodie.insert.shuffle.parallelism=2
      hoodie.delete.shuffle.parallelism=2
      hoodie.bulkinsert.shuffle.parallelism=2
      hoodie.datasource.write.recordkey.field=id
      hoodie.datasource.write.partitionpath.field=dt
      # DFS Source
      hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie/input 

      Stacktrace: 

      Driver stacktrace:Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:635) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:194) at scala.Option.orElse(Option.scala:289) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:193) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:664) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:663) at org.apache.hudi.utilities.sources.ParquetDFSSource.fromFiles(ParquetDFSSource.java:55) at org.apache.hudi.utilities.sources.ParquetDFSSource.lambda$fetchNextBatch$0(ParquetDFSSource.java:50) at org.apache.hudi.common.util.Option.map(Option.java:104) at org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:50) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:335) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:236) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:579) ... 4 moreCaused by: org.apache.spark.SparkException: Exception thrown in awaitResult:  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:613) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:605) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) ... 3 moreCaused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/tmp/hoodie/input/.checkpoint/commits/0; isDirectory=false; length=29; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538) at org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.RuntimeException: file:/tmp/hoodie/input/.checkpoint/commits/0 is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [34, 58, 48, 125] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                vho Vu Ho
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: