Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16345

Computed column can not refer time attribute column

    XMLWordPrintableJSON

Details

    Description

      If a computed column refer a time attribute column, computed column will lose  time attribute and cause validation fail.

      CREATE TABLE orders (
        order_id STRING,
        order_time TIMESTAMP(3),
        amount DOUBLE,
        amount_kg as amount * 1000,
        // can not select computed column standard_ts which from column order_time that used as WATERMARK
        standard_ts as order_time + INTERVAL '8' HOUR,
        WATERMARK FOR order_time AS order_time
      ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = '0.10',
        'connector.topic' = 'flink_orders',
        'connector.properties.zookeeper.connect' = 'localhost:2181',
        'connector.properties.bootstrap.servers' = 'localhost:9092',
        'connector.properties.group.id' = 'testGroup',
        'connector.startup-mode' = 'earliest-offset',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
      );
      
      

      The query `select amount_kg from orders` runs normally, 

      the` he query `select standard_ts from orders` throws a validation exception message as following:

      [ERROR] Could not execute SQL statement. Reason:
       java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
       validated type:
       RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) ts) NOT NULL
       converted type:
       RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME ATTRIBUTE(ROWTIME) ts) NOT NULL
       rel:
       LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], ts=[$4])
       LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
       LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
       LogicalTableScan(table=[[default_catalog, default_database, orders, source: [Kafka010TableSource(order_id, order_time, amount)]]])
       

       

       

      Attachments

        Activity

          People

            jark Jark Wu
            leonard Leonard Xu
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m