Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.19.0, 1.18.1, jdbc-3.1.2
-
None
-
None
Description
The JDBC sink for Postgres does not support TIMESTAMP_LTZ, nor TIMESTAMP WITH TIME ZONE types.
Related issues: FLINK-22199, FLINK-20869
Problem Explanation
A Postgres target_table has a field tm_tz of type timestamptz .
-- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE )
In Flink we have a table with a column of type TIMESTAMP_LTZ(6), and our goal is to sink it to target_table.
-- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... )
According to AbstractPostgresCompatibleDialect.supportedTypes(), TIMESTAMP_WITH_LOCAL_TIME_ZONE is supported, while TIMESTAMP_WITH_TIME_ZONE is not.
However, when the converter is created via AbstractJdbcRowConverter.externalConverter(), it throws an UnsupportedOperationException since TIMESTAMP_WITH_LOCAL_TIME_ZONE is not among the available types, while TIMESTAMP_WITH_TIME_ZONE is.
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6)
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
Using TIMESTAMP WITH TIME ZONE
Defining tm_tz in Flink as TIMESTAMP(6) WITH TIME ZONE instead of TIMESTAMP_LTZ(6) does not solve the issue, and returns the following error instead:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "TIME" at line 1, column 66. Was expecting: "LOCAL" ... at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
Using TIMESTAMP
Defining tm_tz in Flink as TIMESTAMP(6) can lead to potentially incorrect time zone conversions.
For instance, assume that the local time is GMT+2 and we have a row in Flink with tm_tz equal to '2024-04-01 00:00:00' (UTC). When the toTimestamp() method (reference) used by AbstractJdbcRowConverter.createExternalConverter() is invoked
it adds the local timezone to it, instead of "+00":
Postgres will therefore receive '2024-04-01 00:00:00+02' (instead of +00) and will convert it to '2024-03-31 22:00:00+00'.
Possible Solutions
- Make the JDBC connector support TIMESTAMP_LTZ by adding a proper converter to AbstractJdbcRowConverter.externalConverter().
- Fix the behavior of the converter for TIMESTAMP types, so that:
- it either forces UTC timezone (like adding "+00" to timestamps) or
- it removes timezone information from the timestamp passed to the external system
Attachments
Attachments
Issue Links
- is related to
-
FLINK-22199 The JDBC connector does not support TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL.
- Open