Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35290

Wrong Instant type conversion TableAPI to Datastream in thread mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.18.1
    • None
    • API / Python
    • None

    Description

      In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? Nevertheless, if we switch from `process` to `thread` mode for execution, the `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I only use the DatastreamAPI and read `Types.Instant()` directly, the conversion in both `thread` and `process` mode seem to work just fine.

      Below a minimal example exposing the bug:

      ```
      EXECUTION_MODE = "thread" # or "process"
      config = Configuration()
      config.set_string("python.execution-mode", EXECUTION_MODE)

      env = StreamExecutionEnvironment.get_execution_environment()
      t_env = StreamTableEnvironment.create(env)
      t_env.get_config().set("parallelism.default", "1")
      t_env.get_config().set("python.fn-execution.bundle.size", "1")
      t_env.get_config().set("python.execution-mode", EXECUTION_MODE)

      def to_epoch_ms(row: Row):
      print(type(row[1]))
      return row[1].to_epoch_milli()

      t_env.to_data_stream(
      t_env.from_elements(
      [
      (1, datetime(year=2024, day=10, month=9, hour=9)),
      (2, datetime(year=2024, day=10, month=9, hour=12)),
      (3, datetime(year=2024, day=22, month=11, hour=12)),
      ],
      DataTypes.ROW(
      [
      DataTypes.FIELD("id", DataTypes.INT()),
      DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
      ]
      ),
      )
      ).map(to_epoch_ms, output_type=Types.LONG()).print()
      env.execute()
      ```

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wzorgdrager Wouter Zorgdrager
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: