Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6675

Clean action will delete the whole table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.11.1, 0.13.0
    • 0.14.0
    • cleaning
    • hudi 0.11 both 0.13.
      spark 3.4

    Description

      Abstract

      when I use inset_overwrite feature both in spark sql and api, It's will clean the whole table when it's not partition table

      then throw this exception

      Version

      1. hudi 0.11 both 0.13.
      2. spark 3.4

      Bug Position

      org.apache.hudi.table.action.clean.CleanActionExecutor#deleteFileAndGetResult

      How to recurrent

      Need to run 4 times, fourth  time will trigger clean action.

      0.11, both sql and api

      0.13 just api

       

      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.DataSourceWriteOptions._
      import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
      import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
      
      object InsertOverwriteTest {
        def main(array: Array[String]): Unit = {
          val spark = SparkSession.builder()
            .appName("TestInsertOverwrite")
            .master("local[4]")
            .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.catalog.spark_catalog" ,"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
            .getOrCreate()
      
          spark.conf.set("hoodie.index.type", "BUCKET")
          spark.conf.set("hoodie.storage.layout.type", "BUCKET")
          spark.conf.set("HADOOP_USER_NAME", "parallels")
          System.setProperty("HADOOP_USER_NAME", "parallels")
      
          var seq = List(
            Row("uuid_01", "27", "2022-09-23", "par_01"),
            Row("uuid_02", "21", "2022-09-23", "par_02"),
            Row("uuid_03", "23", "2022-09-23", "par_04"),
            Row("uuid_04", "24", "2022-09-23", "par_02"),
            Row("uuid_05", "26", "2022-09-23", "par_01"),
            Row("uuid_06", "20", "2022-09-23", "par_03"),
          )
      
          var rdd = spark.sparkContext.parallelize(seq)
          var structType: StructType = StructType(Array(
            StructField("uuid", DataTypes.StringType, nullable = true),
            StructField("age", DataTypes.StringType, nullable = true),
            StructField("ts", DataTypes.StringType, nullable = true),
            StructField("par", DataTypes.StringType, nullable = true)
          ))
      
          var df1 = spark.createDataFrame(rdd, structType)
            .createOrReplaceTempView("compact_test_num")
      
          var df: DataFrame = spark.sql(" select uuid, age, ts, par from compact_test_num limit 10")
      
          df.write.format("org.apache.hudi")
            .option(RECORDKEY_FIELD.key, "uuid")
            .option(PRECOMBINE_FIELD.key, "ts")
      //      .option(PARTITIONPATH_FIELD.key(), "par")
            .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
            .option(KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      //      .option(KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.ComplexKeyGenerator")
            .option(OPERATION.key, INSERT_OVERWRITE_OPERATION_OPT_VAL)
            .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
            .option("hoodie.metadata.enable", "false")
            .option("hoodie.index.type", "BUCKET")
            .option("hoodie.bucket.index.hash.field", "uuid")
            .option("hoodie.bucket.index.num.buckets", "2")
            .option("hoodie.storage.layout.type", "BUCKET")
            .option("hoodie.storage.layout.partitioner.class", "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner")
            .option("hoodie.table.name", "cow_20230801_012")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.delete.shuffle.parallelism", "2")
            .option("hoodie.clean.max.commits", "2")
            .option("hoodie.cleaner.commits.retained", "2")
            .option("hoodie.datasource.write.hive_style_partitioning", "true")
            .mode(SaveMode.Append)
            .save("hdfs://bigdata01:9000/hudi_test/cow_20230801_012")
        }
      
      }
       

      Attachments

        1. image-2023-08-10-10-35-02-798.png
          400 kB
          sanqingleo
        2. image-2023-08-10-10-37-05-339.png
          210 kB
          sanqingleo

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rongtongliu sanqingleo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: