Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
0.9.0
Description
Test Case:
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._
1.准备数据
spark.sql("create table test1(a int,b string,c string) using hudi partitioned by(b) options(primaryKey='a')") spark.sql("insert into table test1 select 1,2,3")
2.创建hudi table test2
spark.sql("create table test2(a int,b string,c string) using hudi partitioned by(b) options(primaryKey='a')")
3.datasource向test2写入数据
val base_data=spark.sql("select * from testdb.test1") base_data.write.format("hudi"). option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL). option(RECORDKEY_FIELD_OPT_KEY, "a"). option(PARTITIONPATH_FIELD_OPT_KEY, "b"). option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.SimpleKeyGenerator"). option(OPERATION_OPT_KEY, "bulk_insert"). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "b"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,"org.apache.hudi.hive.MultiPartKeysValueExtractor"). option(HIVE_DATABASE_OPT_KEY, "testdb"). option(HIVE_TABLE_OPT_KEY, "test2"). option(HIVE_USE_JDBC_OPT_KEY, "true"). option("hoodie.bulkinsert.shuffle.parallelism", 4). option("hoodie.datasource.write.hive_style_partitioning", "true"). option(TABLE_NAME, "test2").mode(Append).save(s"/user/hive/warehouse/testdb.db/test2")
此时执行查询结果如下:
+---+---+---+ | a| b| c| +---+---+---+ | 1| 3| 2| +---+---+---+
4.删除一条记录
spark.sql("delete from testdb.test2 where a=1")
5.执行查询,a=1的记录未被删除
spark.sql("select a,b,c from testdb.test2").show
+---+---+---+ | a| b| c| +---+---+---+ | 1| 3| 2| +---+---+---+