Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28528

Table.getSchema fails on table with watermark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.15.1
    • None
    • API / Python
    • None

    Description

      The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark.

          def test_flink_2(self):
              env = StreamExecutionEnvironment.get_execution_environment()
              t_env = StreamTableEnvironment.create(env)
              table = t_env.from_descriptor(
                  TableDescriptor.for_connector("filesystem")
                  .schema(
                      Schema.new_builder()
                      .column("name", DataTypes.STRING())
                      .column("cost", DataTypes.INT())
                      .column("distance", DataTypes.INT())
                      .column("time", DataTypes.TIMESTAMP(3))
                      .watermark("time", expr.col("time") - expr.lit(60).seconds)
                      # .watermark("time", "`time` - INTERVAL '60' SECOND")
                      .build()
                  )
                  .format("csv")
                  .option("path", "./input.csv")
                  .build()
              )
      
              print(table.get_schema())
      

      It causes the following exception

      E       pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 60000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
      E       	at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
      E       	at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
      E       	at java.util.Collections$SingletonList.forEach(Collections.java:4824)
      E       	at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
      E       	at org.apache.flink.table.api.Table.getSchema(Table.java:101)
      E       	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      E       	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      E       	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      E       	at java.lang.reflect.Method.invoke(Method.java:498)
      E       	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      E       	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      E       	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
      E       	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      E       	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      E       	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      E       	at java.lang.Thread.run(Thread.java:748)
      

      Attachments

        Activity

          People

            hxb Xingbo Huang
            xuannan Xuannan Su
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: