Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.11.1, 0.13.0
-
hudi 0.11 both 0.13.
spark 3.4
Description
Abstract
when I use inset_overwrite feature both in spark sql and api, It's will clean the whole table when it's not partition table
then throw this exception
Version
- hudi 0.11 both 0.13.
- spark 3.4
Bug Position
org.apache.hudi.table.action.clean.CleanActionExecutor#deleteFileAndGetResult
How to recurrent
Need to run 4 times, fourth time will trigger clean action.
0.11, both sql and api
0.13 just api
import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} object InsertOverwriteTest { def main(array: Array[String]): Unit = { val spark = SparkSession.builder() .appName("TestInsertOverwrite") .master("local[4]") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.catalog.spark_catalog" ,"org.apache.spark.sql.hudi.catalog.HoodieCatalog") .getOrCreate() spark.conf.set("hoodie.index.type", "BUCKET") spark.conf.set("hoodie.storage.layout.type", "BUCKET") spark.conf.set("HADOOP_USER_NAME", "parallels") System.setProperty("HADOOP_USER_NAME", "parallels") var seq = List( Row("uuid_01", "27", "2022-09-23", "par_01"), Row("uuid_02", "21", "2022-09-23", "par_02"), Row("uuid_03", "23", "2022-09-23", "par_04"), Row("uuid_04", "24", "2022-09-23", "par_02"), Row("uuid_05", "26", "2022-09-23", "par_01"), Row("uuid_06", "20", "2022-09-23", "par_03"), ) var rdd = spark.sparkContext.parallelize(seq) var structType: StructType = StructType(Array( StructField("uuid", DataTypes.StringType, nullable = true), StructField("age", DataTypes.StringType, nullable = true), StructField("ts", DataTypes.StringType, nullable = true), StructField("par", DataTypes.StringType, nullable = true) )) var df1 = spark.createDataFrame(rdd, structType) .createOrReplaceTempView("compact_test_num") var df: DataFrame = spark.sql(" select uuid, age, ts, par from compact_test_num limit 10") df.write.format("org.apache.hudi") .option(RECORDKEY_FIELD.key, "uuid") .option(PRECOMBINE_FIELD.key, "ts") // .option(PARTITIONPATH_FIELD.key(), "par") .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator") .option(KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") // .option(KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.ComplexKeyGenerator") .option(OPERATION.key, INSERT_OVERWRITE_OPERATION_OPT_VAL) .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) .option("hoodie.metadata.enable", "false") .option("hoodie.index.type", "BUCKET") .option("hoodie.bucket.index.hash.field", "uuid") .option("hoodie.bucket.index.num.buckets", "2") .option("hoodie.storage.layout.type", "BUCKET") .option("hoodie.storage.layout.partitioner.class", "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner") .option("hoodie.table.name", "cow_20230801_012") .option("hoodie.upsert.shuffle.parallelism", "2") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.delete.shuffle.parallelism", "2") .option("hoodie.clean.max.commits", "2") .option("hoodie.cleaner.commits.retained", "2") .option("hoodie.datasource.write.hive_style_partitioning", "true") .mode(SaveMode.Append) .save("hdfs://bigdata01:9000/hudi_test/cow_20230801_012") } }
Attachments
Attachments
Issue Links
- links to