Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Unresolved
-
None
Description
when complex key gen is used and one of the field in record key is a timestamp field, row writer path and rdd path gives different record key values. GenericRecord path converts timestamp, where as row writer path does not do any conversion.
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
mode(Overwrite).
save("/tmp/hudi_tbl_trial/")
val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
hudiDF.createOrReplaceTempView("hudi_sql_tbl")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from hudi_sql_tbl").show(false)
+----------------------------------+---+-------------------+------+ |_hoodie_record_key |str|eventTime |typeId| +----------------------------------+---+-------------------+------+ |str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1 | |str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1 | |str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2 | |str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2 | +----------------------------------+---+-------------------+------+
// now retry w/ bulk_insert row writer path
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
"hoodie.datasource.write.operation","bulk_insert").
mode(Overwrite).
save("/tmp/hudi_tbl_trial_bulk_insert/")
val hudiDF_bulk_insert = spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")
hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from hudi_sql_tbl_bulk_insert").show(false)
+---------------------------------------+---+-------------------+------+ |_hoodie_record_key |str|eventTime |typeId| +---------------------------------------+---+-------------------+------+ |str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2 | |str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2 | |str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1 | |str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1 | +---------------------------------------+---+-------------------+------+
Attachments
Issue Links
- relates to
-
HUDI-2390 KeyGenerator discrepancy between DataFrame writer and SQL
- Closed
- links to