Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
cdc-3.1.0
Description
Currently, Flink CDC Time encounters time type errors (including DateTime, Time, Date, TimeStamp) when using MySQL Connector (JsonDebeziumDeserializationSchema) as deserialization, and the converted time is wrong. The essential reason is that the timestamp returned by the bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The community has already had some PR, but they are not work.
Now a way is provided to provide a solution based on Debezium's custom Convert interface (https://debezium.io/documentation/reference/1.9/development/converters.html),
Users can choose to convert the above four time types into STRING according to the specified time format to ensure that users can correctly convert JSON when using the Flink DataStream API.
When the user enables this converter, we need to configure it according to the parameters, That's some datastream use case:
Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("converters", "datetime"); debeziumProperties.setProperty("datetime.database.type", DataBaseType.MYSQL.getType()); debeziumProperties.setProperty("datetime.type", "cn.xxx.sources.cdc.MysqlDebeziumConverter"); debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd"); debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss"); debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8"); MySqlSourceBuilder<String> builder = MySqlSource.<String>builder() .hostname(url[0]) .port(Integer.parseInt(url[1])) .databaseList(table.getDatabase()) .tableList(getTablePattern(table)) .username(table.getUserName()) .password(table.getPassword()) .debeziumProperties(debeziumProperties);