Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21172

canal-json format include es field

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Canal flat message json format has an 'es' field extracted from mysql binlog which means the row data real change time in mysql. It expressed the event time naturally but was ignored during deserialization.

      {
        "data": [
          {
            "id": "111",
            "name": "scooter",
            "description": "Big 2-wheel scooter",
            "weight": "5.18"
          }
        ],
        "database": "inventory",
        "es": 1589373560000,
        "id": 9,
        "isDdl": false,
        "mysqlType": {
          "id": "INTEGER",
          "name": "VARCHAR(255)",
          "description": "VARCHAR(512)",
          "weight": "FLOAT"
        },
        "old": [
          {
            "weight": "5.15"
          }
        ],
        "pkNames": [
          "id"
        ],
        "sql": "",
        "sqlType": {
          "id": 4,
          "name": 12,
          "description": 12,
          "weight": 7
        },
        "table": "products",
        "ts": 1589373560798,
        "type": "UPDATE"
      }
      

      org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema

          private static RowType createJsonRowType(DataType databaseSchema) {
              // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
              return (RowType)
                      DataTypes.ROW(
                                      DataTypes.FIELD("data", DataTypes.ARRAY(databaseSchema)),
                                      DataTypes.FIELD("old", DataTypes.ARRAY(databaseSchema)),
                                      DataTypes.FIELD("type", DataTypes.STRING()),
                                      DataTypes.FIELD("database", DataTypes.STRING()),
                                      DataTypes.FIELD("table", DataTypes.STRING()))
                              .getLogicalType();
          }
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            nicholasjiang Nicholas Jiang
            jiabao.sun jiabao.sun
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment