Description
spark.sql(s"set ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
spark.sql(
s"""
create table $tableName ( |
id int, |
name string, |
price long, |
_ts long, |
description string |
) using hudi |
tblproperties( |
type ='$tableType', |
primaryKey = 'id', |
preCombineField = '_ts' |
) |
location '$basePath' """.stripMargin) spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')") |
Merge Into:
// Partial updates using MERGE INTO statement with changed fields: "price" and "_ts"
spark.sql(
s"""
merge into $tableName t0 |
using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts |
union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0 |
on t0.id = s0.id |
when matched then update set price = s0.price, _ts = s0._ts |
""".stripMargin) |
The schema for this merge into command when we reach HoodieSparkSqlWriter.deduceWriterSchema is given below.
i.e.
val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters)
the merge into command only instructs to update price and _ts right? So, why other fields are also picked up from source(for eg name).
You can check out the test in TestPartialUpdateForMergeInto.Test partial update with MOR and Avro log format
Note: This is partial update support w/ MergeInto btw, not a regular MergeInto.