Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35273

PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • API / Python
    • None

    Description

      The issue is from https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
      When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it turns out that the output result does not show the expected result.
      Here is my test codes:

      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.common import Types, Configuration
      from pyflink.table import DataTypes, StreamTableEnvironment
      from datetime import datetime
      import pytz
      
      
      config = Configuration()
      config.set_string("python.client.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
      config.set_string("python.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
      env = StreamExecutionEnvironment.get_execution_environment(config)
      t_env = StreamTableEnvironment.create(env)
      t_env.get_config().set_local_timezone("UTC")
      # t_env.get_config().set_local_timezone("GMT-08:00")
      
      input_table = t_env.from_elements(
          [
              (
                  "elementA",
                  datetime(year=2024, month=4, day=12, hour=8, minute=35),
              ),
              (
                  "elementB",
                  datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.utc),
                  # datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.timezone('America/New_York')),
              ),
          ],
          DataTypes.ROW(
              [
                  DataTypes.FIELD("name", DataTypes.STRING()),
                  DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
              ]
          ),
      )
      input_table.execute().print()
      
      # SQL
      sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT TO_TIMESTAMP_LTZ(1712910900000, 3);")
      t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH ('connector'='print');")
      t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
      

      The output is:

      +----+--------------------------------+-------------------------+
      | op |                           name |               timestamp |
      +----+--------------------------------+-------------------------+
      | +I |                       elementA | 2024-04-12 08:35:00.000 |
      | +I |                       elementB | 2024-04-12 16:35:00.000 |
      +----+--------------------------------+-------------------------+
      2 rows in set
      +I[2024-04-12T08:35:00Z]
      

      In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow logic to convert python obj to sql type:

      EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
      ...
      def to_sql_type(self, dt):
              if dt is not None:
                  seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                             else time.mktime(dt.timetuple()))
                  return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
      

      It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not decided by the timezone set by `set_local_timezone`.

      Attachments

        Activity

          People

            Unassigned Unassigned
            bgeng777 Biao Geng
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: