Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Invalid
-
None
-
None
-
None
Description
I was trying our spark streaming ingestion to hudi and ran into clustering issue w/ 0.12.3 bundle.
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord import java.time.LocalDateTime import org.apache.spark.sql.streaming.Trigger
val sourceBasePath = SOURCE_PATH val targetPath = TARGET_PATH
val df = spark.readStream.format("hudi").load(sourceBasePath)val query = df.writeStream.foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, _: Long) => { println(LocalDateTime.now() + " Start writing cow table") batchDF.drop("_hoodie_commit_time").drop("_hoodie_commit_seqno").drop("_hoodie_record_key").drop("_hoodie_partition_path").drop("_hoodie_file_name"). filter("id%2 ==1"). withColumn("date_col",substring(col("created_at"),0,10)). write.format("hudi"). option(TABLE_TYPE.key, "COPY_ON_WRITE"). option(PRECOMBINE_FIELD.key, "created_at"). option(RECORDKEY_FIELD.key, "id"). option(PARTITIONPATH_FIELD.key, "date_col"). option("hoodie.datasource.write.operation","insert"). option("hoodie.insert.shuffle.parallelism","10"). option("hoodie.parquet.small.file.limit","0"). option("hoodie.clustering.inline", "true"). option("hoodie.clustering.inline.max.commits", "10"). option("hoodie.clustering.plan.strategy.target.file.max.bytes", "104857600"). option("hoodie.clustering.plan.strategy.small.file.limit", "52428800"). option("hoodie.table.name","hudi_tbl"). option("hoodie.write.concurrency.mode","optimistic_concurrency_control"). option("hoodie.cleaner.policy.failed.writes","LAZY"). option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"). option("hoodie.write.lock.zookeeper.url","ABC"). option("hoodie.write.lock.zookeeper.port","2181"). option("hoodie.write.lock.zookeeper.lock_key","tbl1"). option("hoodie.write.lock.zookeeper.base_path","/tmp/locks"). mode(Append). save(targetPath) println(LocalDateTime.now() + " finish") } }.option("checkpointLocation", TARGET_CHECKPOINT). trigger(Trigger.ProcessingTime("10 minutes")).start()query.awaitTermination()
stacktrace:
scala> query.awaitTermination() 2023-08-15T03:00:05.583925 Start writing cow table 23/08/15 03:00:05 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 23/08/15 03:00:05 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 23/08/15 03:01:14 WARN BaseRollbackActionExecutor: Rollback finished without deleting inflight instant file. Instant=[==>20230814185344428__replacecommit__INFLIGHT] 23/08/15 03:01:16 ERROR MicroBatchExecution: Query [id = fd43b951-f1fd-479e-bd87-1aae2e9e396d, runId = 70797db4-fbcd-40f0-b09d-540013da6717] terminated with error java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: Failed to find data source: hoodie-parquet. Please find packages at https://spark.apache.org/third-party-projects.html at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[?:?] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?] Caused by: java.lang.ClassNotFoundException: Failed to find data source: hoodie-parquet. Please find packages at https://spark.apache.org/third-party-projects.html at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:573) ~[spark-catalyst_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?] ... 6 more Caused by: java.lang.ClassNotFoundException: hoodie-parquet.DefaultSource at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.14.jar:?] at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at scala.util.Failure.orElse(Try.scala:224) ~[scala-library-2.12.14.jar:?] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345) ~[spark-sql_2.12-3.3.0.jar:3.3.0] at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248) ~[hudi-spark3.3-bundle_2.12-0.12.3.jar:0.12.3] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?] ... 6 more org.apache.spark.sql.streaming.StreamingQueryException: Query [id = fd43b951-f1fd-479e-bd87-1aae2e9e396d, runId = 70797db4-fbcd-40f0-b09d-540013da6717] terminated with exception: java.lang.ClassNotFoundException: Failed to find data source: hoodie-parquet. Please find packages at https://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:330) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) Caused by: java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: Failed to find data source: hoodie-parquet. Please find packages at https://spark.apache.org/third-party-projects.html at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.ClassNotFoundException: Failed to find data source: hoodie-parquet. Please find packages at https://spark.apache.org/third-party-projects.html at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:573) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:100) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:99) at org.apache.spark.sql.execution.datasources.DataSource.providingInstance(DataSource.scala:113) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345) at org.apache.hudi.BaseFileOnlyRelation.toHadoopFsRelation(BaseFileOnlyRelation.scala:203) at org.apache.hudi.DefaultSource$.resolveBaseFileOnlyRelation(DefaultSource.scala:268) at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:232) at org.apache.spark.sql.adapter.BaseSpark3Adapter.createRelation(BaseSpark3Adapter.scala:101) at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.readRecordsForGroupAsRow(MultipleSparkJobExecutionStrategy.java:405) at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:248) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 6 more Caused by: java.lang.ClassNotFoundException: hoodie-parquet.DefaultSource at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661) at scala.util.Failure.orElse(Try.scala:224) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661) ... 17 morescala>