Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
The issue is from https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it turns out that the output result does not show the expected result.
Here is my test codes:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types, Configuration from pyflink.table import DataTypes, StreamTableEnvironment from datetime import datetime import pytz config = Configuration() config.set_string("python.client.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") config.set_string("python.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") env = StreamExecutionEnvironment.get_execution_environment(config) t_env = StreamTableEnvironment.create(env) t_env.get_config().set_local_timezone("UTC") # t_env.get_config().set_local_timezone("GMT-08:00") input_table = t_env.from_elements( [ ( "elementA", datetime(year=2024, month=4, day=12, hour=8, minute=35), ), ( "elementB", datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.utc), # datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.timezone('America/New_York')), ), ], DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), ] ), ) input_table.execute().print() # SQL sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT TO_TIMESTAMP_LTZ(1712910900000, 3);") t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH ('connector'='print');") t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
The output is:
+----+--------------------------------+-------------------------+ | op | name | timestamp | +----+--------------------------------+-------------------------+ | +I | elementA | 2024-04-12 08:35:00.000 | | +I | elementB | 2024-04-12 16:35:00.000 | +----+--------------------------------+-------------------------+ 2 rows in set +I[2024-04-12T08:35:00Z]
In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow logic to convert python obj to sql type:
EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6 ... def to_sql_type(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not decided by the timezone set by `set_local_timezone`.