Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19740

Error in to_pandas for table containing event time: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.12.0, 1.11.2
    • Fix Version/s: 1.12.0, 1.11.3
    • Environment:

      Ubuntu 18.04

      Python 3.8, jar built from master yesterday.

      Or Python 3.7, installed latest version from pip.

      Description

      In a nutshell, if I create a table with an event time column:

      CREATE TABLE simple_table (
         ts TIMESTAMP(3),
         WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
      )

      then it fails to serialize with .to_pandas(). This only happens with the watermark line and in streaming mode.

      Full code:

      from pyflink.table import EnvironmentSettings, StreamTableEnvironment

      env_settings = (
      EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
      )
      table_env = StreamTableEnvironment.create(environment_settings=env_settings)
      table_env.execute_sql(
      """
      CREATE TABLE simple_table (
      ts TIMESTAMP(3),
      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
      ) WITH (
      'connector.type' = 'filesystem',
      'format.type' = 'csv',
      'connector.path' = '/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_2.csv'
      )
      """
      )

      print(table_env.from_path("simple_table").to_pandas())

      Output:
       
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar) to constructor java.nio.DirectByteBuffer(long,int)
      WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      Traceback (most recent call last):
      File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 20, in <module>
      print(table_env.from_path("simple_table").to_pandas())
      File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 839, in to_pandas
      table = pa.Table.from_batches(serializer.load_from_iterator(batches))
      File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_batches
      File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 76, in load_from_iterator
      reader = pa.ipc.open_stream(
      File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 146, in open_stream
      return RecordBatchStreamReader(source)
      File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 62, in _init_
      self._open(source)
      File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
      File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
      File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 69, in readinto
      input = self.leftover or (self.itor.next() if self.itor.hasNext() else None)
      File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in _call_
      return_value = get_return_value(
      File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco
      return f(*a, **kw)
      File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
      raise Py4JJavaError(
      py4j.protocol.Py4JJavaError: An error occurred while calling o39.next.
      : java.lang.RuntimeException: Failed to serialize the data of the table
      at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:683)
      at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:663)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
      at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp (java.time.LocalDateTime is in module java.base of loader 'bootstrap'; java.sql.Timestamp is in module java.sql of loader 'platform')
      at org.apache.flink.table.data.util.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:897)
      at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
      at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1426)
      at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1414)
      at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
      at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:655)
      at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:641)
      at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:675)
      ... 12 more

        Attachments

          Activity

            People

            • Assignee:
              hxbks2ks Huang Xingbo
              Reporter:
              alexmojaki Alex Hall
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: