Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34935

TIMESTAMP_LTZ type Unsupported when using JdbcCatalog to read from Postgres

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.18.1
    • None
    • Connectors / JDBC
    • flink: 1.17.0
      flink-connector-jdbc: 3.1.0-1.17
      postgres: 14.5
      java: 11

    • Patch

    Description

      When I use JdbcCatalog to select from postgres table, it throw Exception:

      Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6)
          at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
          at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
          at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
          at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
          at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
          at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
          at org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50)
          at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182)
          at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
          at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
          at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
          at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
          at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
          at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
          at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
          at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
          at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
          at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
          at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
          at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
          at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
          at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
          at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80)
          at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
          at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
          at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
          at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
          at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
          at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
          at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
          at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
          at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
          at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
          at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
          at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76)
          at com.demo.DataflowJob.main(DataflowJob.java:110) 

       

      The source table is a postgres table, which schema is:

      create table t_test (
       id integer primary key,
       name timestamptz
      );

       

      I've read the relevant code and roughly located where the problem is. Can I make some code contribution to fix this problem?

      Attachments

        Activity

          People

            Unassigned Unassigned
            frankie.du Du Yuzhou
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 72h
                72h
                Remaining:
                Remaining Estimate - 72h
                72h
                Logged:
                Time Spent - Not Specified
                Not Specified