Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27794

The primary key obtained from MySQL is incorrect by using MysqlCatalog

    XMLWordPrintableJSON

Details

    Description

      I want to use MysqlCatalog to get the primary key of the database table `user`. The database table creation statement is as follows

      CREATE TABLE flinksql_test.`user` (
        `uid` bigint(20) NOT NULL,
        `uname` varchar(36) DEFAULT NULL,
        `others` varchar(128) DEFAULT NULL,
        PRIMARY KEY (`uid`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 

       

      This is my test code:

      import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
      import org.apache.flink.table.api.Schema;
      import org.apache.flink.table.catalog.CatalogBaseTable;
      import org.apache.flink.table.catalog.ObjectPath;
      import org.apache.flink.table.catalog.exceptions.TableNotExistException;import java.util.Optional;public class Demo02 {
          public static void main(String[] args) throws TableNotExistException {
              MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql-catalog",
                      "flinksql_test",
                      "root",
                      "123456789",
                      String.format("jdbc:mysql://127.0.0.1:3306"));
              CatalogBaseTable table = mySqlCatalog.getTable(new ObjectPath("flinksql_test", "user"));
              Optional<Schema.UnresolvedPrimaryKey> primaryKey = table
                      .getUnresolvedSchema()
                      .getPrimaryKey();
              System.out.println(primaryKey);
          }
      } 

       

      The obtained primary key is (Host,User), but the primary key from Database is (uid)

       

      I see, the value of the incoming catalog and schema is null, and the SQL splicing of the database to obtain the primary key does not add " TABLE_SCHEMA LIKE ? AND"

      Later, it was found that there was also a user table in the self-contained MySQL database with the following structure:

       

      CREATE TABLE mysql.`user` (
        `Host` char(60) COLLATE utf8_bin NOT NULL DEFAULT '',
        `User` char(32) COLLATE utf8_bin NOT NULL DEFAULT '',
        `Select_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Insert_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Update_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Delete_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Drop_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Reload_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Shutdown_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Process_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `File_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Grant_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `References_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Index_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Alter_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Show_db_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Super_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_tmp_table_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Lock_tables_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Execute_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Repl_slave_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Repl_client_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Show_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Alter_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_user_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Event_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Trigger_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_tablespace_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `ssl_type` enum('','ANY','X509','SPECIFIED') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
        `ssl_cipher` blob NOT NULL,
        `x509_issuer` blob NOT NULL,
        `x509_subject` blob NOT NULL,
        `max_questions` int(11) unsigned NOT NULL DEFAULT '0',
        `max_updates` int(11) unsigned NOT NULL DEFAULT '0',
        `max_connections` int(11) unsigned NOT NULL DEFAULT '0',
        `max_user_connections` int(11) unsigned NOT NULL DEFAULT '0',
        `plugin` char(64) COLLATE utf8_bin NOT NULL DEFAULT 'caching_sha2_password',
        `authentication_string` text COLLATE utf8_bin,
        `password_expired` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `password_last_changed` timestamp NULL DEFAULT NULL,
        `password_lifetime` smallint(5) unsigned DEFAULT NULL,
        `account_locked` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Create_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Drop_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
        `Password_reuse_history` smallint(5) unsigned DEFAULT NULL,
        `Password_reuse_time` smallint(5) unsigned DEFAULT NULL,
        `Password_require_current` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
        `User_attributes` json DEFAULT NULL,
        PRIMARY KEY (`Host`,`User`)
      ) /*!50100 TABLESPACE `mysql` */ ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 COMMENT='Users and global privileges'; 

       

      I think it while happen when there are multiple tables which have same table name, so we can pass the table schema to get primary key?

      Attachments

        Activity

          People

            dusukang dusukang
            dusukang dusukang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: