Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5565 Improve flink cost model framework
  3. FLINK-5568

Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • None
    • Table SQL / API
    • None

    Description

      The TableEnvironment now provides a mechanism to register temporary table. It registers the temp table to calcite catalog, so SQL and TableAPI queries can access to those temp tables. Now DatasetTable, DataStreamTable and TableSourceTable can be registered to TableEnvironment as temporary tables.

      This issue wants to provides a mechanism to connect external catalogs such as HCatalog to the TableEnvironment, so SQL and TableAPI queries could access to tables in the external catalogs without register those tables to TableEnvironment beforehand.

      First, we should point out that there are two kinds of catalog in Flink actually.
      The first one is external catalog as we mentioned before, it provides CRUD operations to databases/tables.
      The second one is calcite catalog, it defines namespace that can be accessed in Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator and SqlConverter depends on the calcite catalog to fetch the tables in SQL or TableAPI.

      So we need to do the following things:
      1. introduce interface for external catalog, maybe provide an in-memory implementation first for test and develop environment.
      2. introduce a mechanism to connect external catalog with Calcite catalog so the tables/databases in external catalog can be accessed in Calcite catalog. Including convert databases of externalCatalog to Calcite sub-schemas, convert tables in a database of externalCatalog to Calcite tables (only support TableSourceTable).
      3. register external catalog to TableEnvironment.

      Here is the design mode of ExternalCatalogTable.

      identifier TableIdentifier dbName and tableName of table
      tableType String type of external catalog table, e.g csv, hbase, kafka
      schema DataSchema schema of table data, including column names and column types
      partitionColumnNames List<String> names of partition column
      properties Map<String, String> properties of external catalog table
      stats TableStats statistics of external catalog table
      comment String
      create time long

      There is still a detail problem need to be take into consideration, that is , how to convert ExternalCatalogTable to TableSourceTable. The question is equals to convert ExternalCatalogTable to TableSource because we could easily get TableSourceTable from TableSource.

      Because different TableSource often contains different fields to initiate an instance. E.g. CsvTableSource needs path, fieldName, fieldTypes, fieldDelim, rowDelim and so on to create a new instance , KafkaTableSource needs configuration and tableName to create a new instance. So it's not a good idea to let Flink framework be responsible for translate ExternalCatalogTable to different kind of TableSourceTable.

      Here is one solution. Let TableSource specify a converter.
      1. provide an Annatition named ExternalCatalogCompatible. The TableSource with the annotation means it is compatible with external catalog, that is, it could be converted to or from ExternalCatalogTable. This annotation specifies the tabletype and converter of the tableSource. For example, for CsvTableSource, it specifies the tableType is csv and converter class is CsvTableSourceConverter.

      @ExternalCatalogCompatible(tableType = "csv", converter = classOf[CsvTableSourceConverter])
      class CsvTableSource(...) {
      ...}
      

      2. Scan all TableSources with the ExternalCatalogCompatible annotation, save the tableType and converter in a Map
      3. When need to convert ExternalCatalogTable to TableSource , get the converter based on tableType. and let converter do convert

      Attachments

        Issue Links

          Activity

            People

              jingzhang Jing Zhang
              ykt836 Kurt Young
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: