Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15379

JDBC connector return wrong value if defined dataType contains precision

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.10.0
    • None
    • None

    Description

      A mysql table like:

       

      // CREATE TABLE `currency` (
        `currency_id` bigint(20) NOT NULL,
        `currency_name` varchar(200) DEFAULT NULL,
        `rate` double DEFAULT NULL,
        `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
        `country` varchar(100) DEFAULT NULL,
        `timestamp6` timestamp(6) NULL DEFAULT NULL,
        `time6` time(6) DEFAULT NULL,
        `gdp` decimal(10,4) DEFAULT NULL,
        PRIMARY KEY (`currency_id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8
      +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+
      | currency_id | currency_name | rate | currency_time       | country | timestamp6                 | time6           | gdp      |
      +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+
      |           1 | US Dollar     | 1020 | 2019-12-20 17:23:00 | America | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
      |           2 | Euro          |  114 | 2019-12-20 12:22:00 | Germany | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
      |           3 | RMB           |   16 | 2019-12-20 12:22:00 | China   | 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 |
      |           4 | Yen           |    1 | 2019-12-20 12:22:00 | Japan   | 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 |
      +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+

       

      If user defined a jdbc table as  dimension table like:

       

      // 
      public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
              "  currency_id BIGINT,\n" +
              "  currency_name STRING,\n" +
              "  rate DOUBLE,\n" +
              "  currency_time TIMESTAMP(3),\n" +
              "  country STRING,\n" +
              "  timestamp6 TIMESTAMP(6),\n" +
              "  time6 TIME(6),\n" +
              "  gdp DECIMAL(10, 4)\n" +
              ") WITH (\n" +
              "   'connector.type' = 'jdbc',\n" +
              "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
              "   'connector.username' = 'root'," +
              "   'connector.table' = 'currency',\n" +
              "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
              "   'connector.lookup.cache.max-rows' = '500', \n" +
              "   'connector.lookup.cache.ttl' = '10s',\n" +
              "   'connector.lookup.max-retries' = '3'" +
              ")";
      

       

      User will get wrong value in column `timestamp6`,`time6`,`gdp`:

      // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, c.timestamp6, c.time6, c.gdp 
      
      1,US Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001
      2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001
      4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001

       

       

      public class JDBCSourceExample {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
      
              EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                      .useBlinkPlanner()
                      .inStreamingMode()
                      .build();
              StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
              String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
                      "  currency_id BIGINT,\n" +
                      "  currency_name STRING,\n" +
                      "  rate DOUBLE,\n" +
                      "  currency_time TIMESTAMP(3),\n" +
                      "  country STRING,\n" +
                      "  timestamp6 TIMESTAMP(6),\n" +
                      "  time6 TIME(6),\n" +
                      "  gdp DECIMAL(10, 4)\n" +
                      ") WITH (\n" +
                      "   'connector.type' = 'jdbc',\n" +
                      "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
                      "   'connector.username' = 'root'," +
                      "   'connector.table' = 'currency',\n" +
                      "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
                      "   'connector.lookup.cache.max-rows' = '500', \n" +
                      "   'connector.lookup.cache.ttl' = '10s',\n" +
                      "   'connector.lookup.max-retries' = '3'" +
                      ")";
      
              tableEnvironment.sqlUpdate(mysqlCurrencyDDL);
      
      
              String querySQL = "select * from currency";
      
              tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), Row.class).print();
      
              tableEnvironment.execute("JdbcExample");
          }
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            leonard Leonard Xu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: