Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35173

Debezium for Mysql connector Custom Time Serializer

    XMLWordPrintableJSON

Details

    Description

      Currently, Flink CDC Time encounters time type errors (including DateTime, Time, Date, TimeStamp) when using MySQL Connector (JsonDebeziumDeserializationSchema) as deserialization, and the converted time is wrong. The essential reason is that the timestamp returned by the bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The community has already had some PR, but they are not work.

      Now a way is provided to provide a solution based on Debezium's custom Convert interface (https://debezium.io/documentation/reference/1.9/development/converters.html),
      Users can choose to convert the above four time types into STRING according to the specified time format to ensure that users can correctly convert JSON when using the Flink DataStream API.

      When the user enables this converter, we need to configure it according to the parameters, That's some datastream use case:

      Properties debeziumProperties = new Properties();
      debeziumProperties.setProperty("converters", "datetime");
      debeziumProperties.setProperty("datetime.database.type", DataBaseType.MYSQL.getType());
      debeziumProperties.setProperty("datetime.type", "cn.xxx.sources.cdc.MysqlDebeziumConverter");
      debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");
      debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
      debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
      debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
      debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
      MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
              .hostname(url[0])
              .port(Integer.parseInt(url[1]))
              .databaseList(table.getDatabase())
              .tableList(getTablePattern(table))
              .username(table.getUserName())
              .password(table.getPassword())
              .debeziumProperties(debeziumProperties); 

       

       

      Attachments

        Activity

          People

            czy006 ZhengYu Chen
            czy006 ZhengYu Chen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: