Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6696

Clustering fails w/ spark3.3 bundle 0.12.3

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Invalid
    • None
    • None
    • clustering
    • None

    Description

      I was trying our spark streaming ingestion to hudi and ran into clustering issue w/ 0.12.3 bundle. 

      import org.apache.hudi.QuickstartUtils._
      import scala.collection.JavaConversions._
      import org.apache.spark.sql.SaveMode._
      import org.apache.hudi.DataSourceReadOptions._
      import org.apache.hudi.DataSourceWriteOptions._
      import org.apache.hudi.config.HoodieWriteConfig._
      import org.apache.hudi.common.model.HoodieRecord
      import java.time.LocalDateTime
      import org.apache.spark.sql.streaming.Trigger
      val sourceBasePath = SOURCE_PATH
      val targetPath = TARGET_PATH
      val df = spark.readStream.format("hudi").load(sourceBasePath)val query = df.writeStream.foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, _: Long) => { 
                  println(LocalDateTime.now() + " Start writing cow table") 
                  batchDF.drop("_hoodie_commit_time").drop("_hoodie_commit_seqno").drop("_hoodie_record_key").drop("_hoodie_partition_path").drop("_hoodie_file_name").
                  filter("id%2 ==1").
                  withColumn("date_col",substring(col("created_at"),0,10)).
                  write.format("hudi").
                  option(TABLE_TYPE.key, "COPY_ON_WRITE").
                  option(PRECOMBINE_FIELD.key, "created_at").
                  option(RECORDKEY_FIELD.key, "id").
                  option(PARTITIONPATH_FIELD.key, "date_col").
                  option("hoodie.datasource.write.operation","insert").
                  option("hoodie.insert.shuffle.parallelism","10").
                  option("hoodie.parquet.small.file.limit","0").
                  option("hoodie.clustering.inline", "true").
                 option("hoodie.clustering.inline.max.commits", "10").
                  option("hoodie.clustering.plan.strategy.target.file.max.bytes", "104857600").
                  option("hoodie.clustering.plan.strategy.small.file.limit", "52428800").
                  option("hoodie.table.name","hudi_tbl").
                 
                  option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
                              option("hoodie.cleaner.policy.failed.writes","LAZY").
                              option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
                              option("hoodie.write.lock.zookeeper.url","ABC").
                              option("hoodie.write.lock.zookeeper.port","2181").
                              option("hoodie.write.lock.zookeeper.lock_key","tbl1").
                              option("hoodie.write.lock.zookeeper.base_path","/tmp/locks").
                  mode(Append).
                  save(targetPath)
                  println(LocalDateTime.now() + " finish")
              }
            }.option("checkpointLocation", TARGET_CHECKPOINT).
            trigger(Trigger.ProcessingTime("10 minutes")).start()query.awaitTermination()
       

      stacktrace:

      scala> query.awaitTermination()
      2023-08-15T03:00:05.583925 Start writing cow table                              
      23/08/15 03:00:05 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
      23/08/15 03:00:05 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
      # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
      # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
      23/08/15 03:01:14 WARN BaseRollbackActionExecutor: Rollback finished without deleting inflight instant file. Instant=[==>20230814185344428__replacecommit__INFLIGHT]
      23/08/15 03:01:16 ERROR MicroBatchExecution: Query [id = fd43b951-f1fd-479e-bd87-1aae2e9e396d, runId = 70797db4-fbcd-40f0-b09d-540013da6717] terminated with error
      java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: 
      Failed to find data source: hoodie-parquet. Please find packages at
      https://spark.apache.org/third-party-projects.html
             
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
              at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?]
              at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[?:?]
              at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
              at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
              at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
              at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
              at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]
      Caused by: java.lang.ClassNotFoundException: 
      Failed to find data source: hoodie-parquet. Please find packages at
      https://spark.apache.org/third-party-projects.html
             
              at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:573) ~[spark-catalyst_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
              ... 6 more
      Caused by: java.lang.ClassNotFoundException: hoodie-parquet.DefaultSource
              at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
              at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
              at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
              at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.14.jar:?]
              at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at scala.util.Failure.orElse(Try.scala:224) ~[scala-library-2.12.14.jar:?]
              at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345) ~[spark-sql_2.12-3.3.0.jar:3.3.0]
              at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3]
              at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
              ... 6 more
      org.apache.spark.sql.streaming.StreamingQueryException: Query [id = fd43b951-f1fd-479e-bd87-1aae2e9e396d, runId = 70797db4-fbcd-40f0-b09d-540013da6717] terminated with exception: java.lang.ClassNotFoundException:
      Failed to find data source: hoodie-parquet. Please find packages at
      https://spark.apache.org/third-party-projects.html  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:330)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
      Caused by: java.util.concurrent.CompletionException: java.lang.ClassNotFoundException:
      Failed to find data source: hoodie-parquet. Please find packages at
      https://spark.apache.org/third-party-projects.html  at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
      Caused by: java.lang.ClassNotFoundException:
      Failed to find data source: hoodie-parquet. Please find packages at
      https://spark.apache.org/third-party-projects.html  at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:573)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99)
        at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345)
        at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203)
        at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268)
        at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232)
        at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101)
        at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405)
        at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 6 more
      Caused by: java.lang.ClassNotFoundException: hoodie-parquet.DefaultSource
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
        ... 17 morescala> 
       

       

       

      Attachments

        Activity

          People

            jonvex Jonathan Vexler
            shivnarayan sivabalan narayanan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: