Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22199

The JDBC connector does not support TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL.

    XMLWordPrintableJSON

Details

    Description

      I have created a PostgreSQL database with the following table definitions.

      CREATE TABLE IF NOT EXISTS test (
       id bigint NOT NULL,
       name varchar(255) NOT NULL,
       created_at timestamp with time zone NOT NULL,
       updated_at timestamp with time zone NOT NULL,
       PRIMARY KEY(id)
       );
      

      When I tried to create a PostgresCatalog in Flink 1.12.2, an error occurred.

       CREATE CATALOG pg WITH (
       'type'='jdbc',
       'base-url'='jdbc:postgresql://localhost:5432/',
       'default-database' = 'test',
       'username' = 'postgres',
       'password' = 'postgres'
       )
       2021-04-11 17:21:57,685 INFO || Catalog pg established connection to jdbc:postgresql://localhost:5432/test [org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog]
       Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL
       at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:183)
       at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:119)
       at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:60)
       at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
       at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
       at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:49)
       at org.apache.flink.connector.jdbc.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:53)
       at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:117)
       at org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:254)
       at org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:71)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
       at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertTemporalTable(SqlToRelConverter.java:2583)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2158)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
       at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
       at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
       at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
       at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902)
       at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871)
       at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
       at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
       at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
       at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
       at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:50)

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              laughingman7743 Tomoyuki NAKAMURA
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: