Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
None
-
None
Description
Date columns are showing integer values while doing CDC read.
For timestamp columns also it is converting into epoch , may be doing similar conversion for date column causing wrong integer values.
Reproducible Code -
```
columns = ["ts","uuid","rider","driver","fare","city", "report_date"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco", "2022-01-01"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_francisco", "2022-01-01"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco", "2022-01-01"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-C","driver-N",34.15,"sao_paulo", "2022-01-01")]
inserts = spark.createDataFrame(data).toDF(*columns)
hudi_options =
{ 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field' : 'uuid', 'hoodie.datasource.write.precombine.field' : 'ts', 'hoodie.datasource.write.partitionpath.field': 'city', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.table.cdc.enabled':'true', }inserts = inserts.withColumn("report_date", expr("CAST(report_date as timestamp)"))
inserts.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
cdc_read_options =
{ 'hoodie.datasource.query.incremental.format': 'cdc', 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': 0 }df = spark.read.format("hudi").load(basePath)
df.show(10, False)
spark.read.format("hudi"). \
options(**cdc_read_options). \
load(basePath).show(10, False)
```