Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35053

TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.19.0, 1.18.1, jdbc-3.1.2
    • None
    • Connectors / JDBC
    • None

    Description

      The JDBC sink for Postgres does not support TIMESTAMP_LTZ, nor TIMESTAMP WITH TIME ZONE types.

      Related issues: FLINK-22199FLINK-20869

      Problem Explanation

      A Postgres target_table has a field tm_tz of type timestamptz .

      -- Postgres DDL
      CREATE TABLE target_table (
          tm_tz TIMESTAMP WITH TIME ZONE
      )
      

      In Flink we have a table with a column of type TIMESTAMP_LTZ(6), and our goal is to sink it to target_table.

      -- Flink DDL
      CREATE TABLE sink (
          tm_tz TIMESTAMP_LTZ(6)
      ) WITH (
          'connector' = 'jdbc',
          'table-name' = 'target_table'
          ...
      )
      

      According to AbstractPostgresCompatibleDialect.supportedTypes(), TIMESTAMP_WITH_LOCAL_TIME_ZONE is supported, while TIMESTAMP_WITH_TIME_ZONE is not.

      However, when the converter is created via AbstractJdbcRowConverter.externalConverter(), it throws an UnsupportedOperationException since TIMESTAMP_WITH_LOCAL_TIME_ZONE is not among the available types, while TIMESTAMP_WITH_TIME_ZONE is.

      Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6)
      	at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
      	at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
      	at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
      	at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
      	at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
      	at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
      	at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
      	at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
      	at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
      	at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
      	at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
      	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
      	at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
      	at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
      	at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
      	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
      	at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
      	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
      

      Using TIMESTAMP WITH TIME ZONE

      Defining tm_tz in Flink as TIMESTAMP(6) WITH TIME ZONE instead of TIMESTAMP_LTZ(6) does not solve the issue, and returns the following error instead:

      Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "TIME" at line 1, column 66.
      Was expecting:
          "LOCAL" ...
          
      	at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
      	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
      

      Using TIMESTAMP

      Defining tm_tz in Flink as TIMESTAMP(6) can lead to potentially incorrect time zone conversions.

      For instance, assume that the local time is GMT+2 and we have a row in Flink with tm_tz equal to '2024-04-01 00:00:00' (UTC). When the toTimestamp() method (reference) used by AbstractJdbcRowConverter.createExternalConverter() is invoked

      it adds the local timezone to it, instead of "+00":

      Postgres will therefore receive '2024-04-01 00:00:00+02' (instead of +00) and will convert it to '2024-03-31 22:00:00+00'.

      Possible Solutions

      1. Make the JDBC connector support TIMESTAMP_LTZ by adding a proper converter to AbstractJdbcRowConverter.externalConverter().
      2. Fix the behavior of the converter for TIMESTAMP types, so that:
        1. it either forces UTC timezone (like adding "+00" to timestamps) or
        2. it removes timezone information from the timestamp passed to the external system

      Attachments

        1. createExternalConverter.png
          76 kB
          Pietro
        2. Timestamp.png
          245 kB
          Pietro
        3. TimestampData.png
          45 kB
          Pietro

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pietro97 Pietro
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: