Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6574

Support nested catalogs in ExternalCatalog

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      We found out that the current external catalog requires three layers of references for any tables. For example, the SQL would look like the following when referencing external table:

      SELECT * FROM catalog.db.table
      

      It would be great to support only two layers of indirections which is closer to many of the deployment on Presto / Hive today.

      SELECT * FROM db.table
      

        Issue Links

          Activity

          Hide
          wheat9 Haohui Mai added a comment -

          Raising the priority to critical as it introduces API changes of 1.3.

          Show
          wheat9 Haohui Mai added a comment - Raising the priority to critical as it introduces API changes of 1.3.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3913

          FLINK-6574 Support nested catalogs in ExternalCatalog.

          This PR extends the APIs of the `ExternalCatalog` to support arbitrary nesting within external catalogs.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-6574

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3913.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3913


          commit 226aae03a605b3149c48de1154de7efb287334f5
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-05-16T00:09:18Z

          FLINK-6574 Support nested catalogs in ExternalCatalog.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3913 FLINK-6574 Support nested catalogs in ExternalCatalog. This PR extends the APIs of the `ExternalCatalog` to support arbitrary nesting within external catalogs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6574 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3913.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3913 commit 226aae03a605b3149c48de1154de7efb287334f5 Author: Haohui Mai <wheat9@apache.org> Date: 2017-05-16T00:09:18Z FLINK-6574 Support nested catalogs in ExternalCatalog.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116721391

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala —
          @@ -34,41 +34,33 @@ trait ExternalCatalog {
          /**

          • Get a table from the catalog
            *
          • * @param dbName The name of the table's database.
          • @param tableName The name of the table.
          • * @throws DatabaseNotExistException thrown if the database does not exist in the catalog.
          • @throws TableNotExistException thrown if the table does not exist in the catalog.
          • @return the requested table
            */
          • @throws[DatabaseNotExistException]
            @throws[TableNotExistException]
          • def getTable(dbName: String, tableName: String): ExternalCatalogTable
            + def getTable(tableName: String): ExternalCatalogTable

          /**

          • Get a list of all table names of a database in the catalog.
            *
          • * @param dbName The name of the database.
          • * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
          • @return The list of table names
            */
          • @throws[DatabaseNotExistException]
          • def listTables(dbName: String): JList[String]
            + def listTables(): JList[String]

          /**

          • Gets a database from the catalog.
            *
          • * @param dbName The name of the database.
          • * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
          • @return The requested database
            */
            @throws[DatabaseNotExistException]
          • def getDatabase(dbName: String): ExternalCatalogDatabase
            + def getSubCatalog(dbName: String): ExternalCatalog

          /**

          • Gets a list of all databases in the catalog.
            *
          • @return The list of database names
            */
          • def listDatabases(): JList[String]
            + def listSubCatalog(): JList[String]
              • End diff –

          +s `listSubCatalogs()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116721391 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala — @@ -34,41 +34,33 @@ trait ExternalCatalog { /** Get a table from the catalog * * @param dbName The name of the table's database. @param tableName The name of the table. * @throws DatabaseNotExistException thrown if the database does not exist in the catalog. @throws TableNotExistException thrown if the table does not exist in the catalog. @return the requested table */ @throws [DatabaseNotExistException] @throws [TableNotExistException] def getTable(dbName: String, tableName: String): ExternalCatalogTable + def getTable(tableName: String): ExternalCatalogTable /** Get a list of all table names of a database in the catalog. * * @param dbName The name of the database. * @throws DatabaseNotExistException thrown if the database does not exist in the catalog @return The list of table names */ @throws [DatabaseNotExistException] def listTables(dbName: String): JList [String] + def listTables(): JList [String] /** Gets a database from the catalog. * * @param dbName The name of the database. * @throws DatabaseNotExistException thrown if the database does not exist in the catalog @return The requested database */ @throws [DatabaseNotExistException] def getDatabase(dbName: String): ExternalCatalogDatabase + def getSubCatalog(dbName: String): ExternalCatalog /** Gets a list of all databases in the catalog. * @return The list of database names */ def listDatabases(): JList [String] + def listSubCatalog(): JList [String] End diff – +s `listSubCatalogs()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116724361

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala —
          @@ -18,138 +18,97 @@

          package org.apache.flink.table.catalog

          -import org.apache.flink.table.api.

          {DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
          import java.util.{List => JList}

          -import scala.collection.mutable.HashMap
          +import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}

          +
          +import scala.collection.mutable
          import scala.collection.JavaConverters._

          /**

          • This class is an in-memory implementation of [[ExternalCatalog]].
            *
          • It could be used for testing or developing instead of used in production environment.
            */
            -class InMemoryExternalCatalog extends CrudExternalCatalog {
            +class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
          • private val databases = new HashMap[String, Database]
            + private val databases = new mutable.HashMap[String, ExternalCatalog]
            + private val tables = new mutable.HashMap[String, ExternalCatalogTable]
          • @throws[DatabaseNotExistException]
            @throws[TableAlreadyExistException]
            override def createTable(
          • table: ExternalCatalogTable,
          • ignoreIfExists: Boolean): Unit = synchronized {
          • val dbName = table.identifier.database
          • val tables = getTables(dbName)
          • val tableName = table.identifier.table
          • if (tables.contains(tableName)) {
          • if (!ignoreIfExists) { - throw new TableAlreadyExistException(dbName, tableName) - }
          • } else {
          • tables.put(tableName, table)
            + tableName: String,
            + table: ExternalCatalogTable,
            + ignoreIfExists: Boolean): Unit = synchronized
            Unknown macro: { + tables.get(tableName) match { + case Some(_) if !ignoreIfExists => throw new TableAlreadyExistException(name, tableName) + case _ => tables.put(tableName, table) } }
          • @throws[DatabaseNotExistException]
            @throws[TableNotExistException]
          • override def dropTable(
          • dbName: String,
          • tableName: String,
          • ignoreIfNotExists: Boolean): Unit = synchronized {
          • val tables = getTables(dbName)
            + override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized
            Unknown macro: { if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) { - throw new TableNotExistException(dbName, tableName) + throw new TableNotExistException(name, tableName) } }
          • @throws[DatabaseNotExistException]
            @throws[TableNotExistException]
          • override def alterTable(
          • table: ExternalCatalogTable,
          • ignoreIfNotExists: Boolean): Unit = synchronized {
          • val dbName = table.identifier.database
          • val tables = getTables(dbName)
          • val tableName = table.identifier.table
            + override def alterTable(tableName: String, table: ExternalCatalogTable,
              • End diff –

          please put each parameter into a new line

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116724361 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala — @@ -18,138 +18,97 @@ package org.apache.flink.table.catalog -import org.apache.flink.table.api. {DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} import java.util.{List => JList} -import scala.collection.mutable.HashMap +import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} + +import scala.collection.mutable import scala.collection.JavaConverters._ /** This class is an in-memory implementation of [ [ExternalCatalog] ]. * It could be used for testing or developing instead of used in production environment. */ -class InMemoryExternalCatalog extends CrudExternalCatalog { +class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog { private val databases = new HashMap [String, Database] + private val databases = new mutable.HashMap [String, ExternalCatalog] + private val tables = new mutable.HashMap [String, ExternalCatalogTable] @throws [DatabaseNotExistException] @throws [TableAlreadyExistException] override def createTable( table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit = synchronized { val dbName = table.identifier.database val tables = getTables(dbName) val tableName = table.identifier.table if (tables.contains(tableName)) { if (!ignoreIfExists) { - throw new TableAlreadyExistException(dbName, tableName) - } } else { tables.put(tableName, table) + tableName: String, + table: ExternalCatalogTable, + ignoreIfExists: Boolean): Unit = synchronized Unknown macro: { + tables.get(tableName) match { + case Some(_) if !ignoreIfExists => throw new TableAlreadyExistException(name, tableName) + case _ => tables.put(tableName, table) } } @throws [DatabaseNotExistException] @throws [TableNotExistException] override def dropTable( dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized { val tables = getTables(dbName) + override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized Unknown macro: { if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) { - throw new TableNotExistException(dbName, tableName) + throw new TableNotExistException(name, tableName) } } @throws [DatabaseNotExistException] @throws [TableNotExistException] override def alterTable( table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit = synchronized { val dbName = table.identifier.database val tables = getTables(dbName) val tableName = table.identifier.table + override def alterTable(tableName: String, table: ExternalCatalogTable, End diff – please put each parameter into a new line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116721535

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala —
          @@ -79,7 +76,7 @@ trait CrudExternalCatalog extends ExternalCatalog {

          • and ignoreIfExists is false
            */
            @throws[DatabaseAlreadyExistException]
          • def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
            + def createDatabase(dbName: String, db: ExternalCatalog, ignoreIfExists: Boolean): Unit
              • End diff –

          rename to `createSubCatalog()` (or add such an alias method?)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116721535 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala — @@ -79,7 +76,7 @@ trait CrudExternalCatalog extends ExternalCatalog { and ignoreIfExists is false */ @throws [DatabaseAlreadyExistException] def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit + def createDatabase(dbName: String, db: ExternalCatalog, ignoreIfExists: Boolean): Unit End diff – rename to `createSubCatalog()` (or add such an alias method?)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116721601

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala —
          @@ -96,13 +93,14 @@ trait CrudExternalCatalog extends ExternalCatalog {
          /**

          • Modifies an existing database in the catalog.
            *
            + * @param dbName Name of the database.
          • @param db New description of the database to update
          • @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
          • if set to false, throw an exception,
          • if set to true, nothing happens.
          • @throws DatabaseNotExistException thrown if the database does not exist in the catalog
            */
            @throws[DatabaseNotExistException]
          • def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit
            + def alterDatabase(dbName: String, db: ExternalCatalog, ignoreIfNotExists: Boolean): Unit
              • End diff –

          rename to `alterSubCatalog()` or add an alias method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116721601 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala — @@ -96,13 +93,14 @@ trait CrudExternalCatalog extends ExternalCatalog { /** Modifies an existing database in the catalog. * + * @param dbName Name of the database. @param db New description of the database to update @param ignoreIfNotExists Flag to specify behavior if the database does not exist: if set to false, throw an exception, if set to true, nothing happens. @throws DatabaseNotExistException thrown if the database does not exist in the catalog */ @throws [DatabaseNotExistException] def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit + def alterDatabase(dbName: String, db: ExternalCatalog, ignoreIfNotExists: Boolean): Unit End diff – rename to `alterSubCatalog()` or add an alias method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116723435

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala —
          @@ -18,138 +18,97 @@

          package org.apache.flink.table.catalog

          -import org.apache.flink.table.api.

          {DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
          import java.util.{List => JList}

          -import scala.collection.mutable.HashMap
          +import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}

          +
          +import scala.collection.mutable
          import scala.collection.JavaConverters._

          /**

          • This class is an in-memory implementation of [[ExternalCatalog]].
            *
          • It could be used for testing or developing instead of used in production environment.
            */
            -class InMemoryExternalCatalog extends CrudExternalCatalog {
            +class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
              • End diff –

          add parameter to ScalaDocs

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116723435 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala — @@ -18,138 +18,97 @@ package org.apache.flink.table.catalog -import org.apache.flink.table.api. {DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} import java.util.{List => JList} -import scala.collection.mutable.HashMap +import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} + +import scala.collection.mutable import scala.collection.JavaConverters._ /** This class is an in-memory implementation of [ [ExternalCatalog] ]. * It could be used for testing or developing instead of used in production environment. */ -class InMemoryExternalCatalog extends CrudExternalCatalog { +class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog { End diff – add parameter to ScalaDocs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116919169

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala —
          @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats

          • @param lastAccessTime Timestamp of last access of the table
            */
            case class ExternalCatalogTable(
          • identifier: TableIdentifier,
              • End diff –

          If we don't have table identifier or table name, the `TableSourceConverter` will not work when trying to convert `ExternalCatalogTable` to a real `TableSource`.

          Even if we keep the table name, i'm not sure it will work for all situations. For example, if we have an outside catalog which have the notion of database, like MySQL. Table in that catalog will named as "db1.table1" or "db2.table2". In the new design, normally we should add database as the sub catalog of the root catalog. So we can create "table1" from sub-catalog named "db1". The problem is the table name is assigned as "table1", we loose the information that this table actually comes from "db1". This may cause some problems when the TableSource trying to establish connection or trying to get table information from outside catalog.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919169 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala — @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( identifier: TableIdentifier, End diff – If we don't have table identifier or table name, the `TableSourceConverter` will not work when trying to convert `ExternalCatalogTable` to a real `TableSource`. Even if we keep the table name, i'm not sure it will work for all situations. For example, if we have an outside catalog which have the notion of database, like MySQL. Table in that catalog will named as "db1.table1" or "db2.table2". In the new design, normally we should add database as the sub catalog of the root catalog. So we can create "table1" from sub-catalog named "db1". The problem is the table name is assigned as "table1", we loose the information that this table actually comes from "db1". This may cause some problems when the TableSource trying to establish connection or trying to get table information from outside catalog.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116919313

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala —
          @@ -114,7 +114,7 @@ case class TableAlreadyExistException(

          • @param db database name
          • @param cause the cause
            */
            -case class DatabaseNotExistException(
            +case class CatalogNotExistException(
              • End diff –

          Please update the comment and parameter name.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919313 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala — @@ -114,7 +114,7 @@ case class TableAlreadyExistException( @param db database name @param cause the cause */ -case class DatabaseNotExistException( +case class CatalogNotExistException( End diff – Please update the comment and parameter name.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r116919339

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala —
          @@ -128,7 +128,7 @@ case class DatabaseNotExistException(

          • @param db database name
          • @param cause the cause
            */
            -case class DatabaseAlreadyExistException(
            +case class CatalogAlreadyExistException(
              • End diff –

          Same above.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919339 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala — @@ -128,7 +128,7 @@ case class DatabaseNotExistException( @param db database name @param cause the cause */ -case class DatabaseAlreadyExistException( +case class CatalogAlreadyExistException( End diff – Same above.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r117201965

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala —
          @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats

          • @param lastAccessTime Timestamp of last access of the table
            */
            case class ExternalCatalogTable(
          • identifier: TableIdentifier,
              • End diff –

          We have tested this with our own catalog integration, and keep the table name should work. The sub catalog which representing the database layer can concat the db and table name when creating Table instance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117201965 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala — @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( identifier: TableIdentifier, End diff – We have tested this with our own catalog integration, and keep the table name should work. The sub catalog which representing the database layer can concat the db and table name when creating Table instance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r117325036

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala —
          @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats

          • @param lastAccessTime Timestamp of last access of the table
            */
            case class ExternalCatalogTable(
          • identifier: TableIdentifier,
              • End diff –

          I agree that it requires keeping more than the name of the table. The more general problem is that you will need to keep additional information for other data sources (e.g., the clusters for Kafka topics)

          I think it might make more sense to put the information at the properties field in `ExternalCatalogTable`. Thoughts?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117325036 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala — @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( identifier: TableIdentifier, End diff – I agree that it requires keeping more than the name of the table. The more general problem is that you will need to keep additional information for other data sources (e.g., the clusters for Kafka topics) I think it might make more sense to put the information at the properties field in `ExternalCatalogTable`. Thoughts?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r117349259

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala —
          @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats

          • @param lastAccessTime Timestamp of last access of the table
            */
            case class ExternalCatalogTable(
          • identifier: TableIdentifier,
              • End diff –

          The properties field is exactly meant to hold configuration parameters such as hostnames, ports, paths, or client configs.

          I think @haohui is right. Not all external tables require a name to access the data (the name under which the table is registered in the catalog and identified by Calcite is the key in the map). A CSV file table needs a path, a Kafka-based table a Kafka config + topic name, etc. A database table which needs a table name could store the table name (and also the database name and schema name) in the properties map.

          Would that work for you @KurtYoung?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117349259 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala — @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( identifier: TableIdentifier, End diff – The properties field is exactly meant to hold configuration parameters such as hostnames, ports, paths, or client configs. I think @haohui is right. Not all external tables require a name to access the data (the name under which the table is registered in the catalog and identified by Calcite is the key in the map). A CSV file table needs a path, a Kafka-based table a Kafka config + topic name, etc. A database table which needs a table name could store the table name (and also the database name and schema name) in the properties map. Would that work for you @KurtYoung?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3913#discussion_r117385333

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala —
          @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats

          • @param lastAccessTime Timestamp of last access of the table
            */
            case class ExternalCatalogTable(
          • identifier: TableIdentifier,
              • End diff –

          I agree not all the "tables" have a name, sounds good to me then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117385333 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala — @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( identifier: TableIdentifier, End diff – I agree not all the "tables" have a name, sounds good to me then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3913

          Thanks for the update @haohui and the review @KurtYoung.
          Will merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3913 Thanks for the update @haohui and the review @KurtYoung. Will merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3913

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3913
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed in 1.3.0 with 6d178a9597f5f9b79a0e3a6f4a61a10734188c85
          Fixed in 1.4.0 with acea4cde5f0225db9e00bbef4a47fdb58419022b

          Show
          fhueske Fabian Hueske added a comment - Fixed in 1.3.0 with 6d178a9597f5f9b79a0e3a6f4a61a10734188c85 Fixed in 1.4.0 with acea4cde5f0225db9e00bbef4a47fdb58419022b

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development