Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20306

Accessing a versioned table as of time fails with a cryptic message

    XMLWordPrintableJSON

Details

    Description

      I tried running a query on a versioned table:

      CREATE TABLE RatesHistory (
          currency_time TIMESTAMP(3) METADATA FROM 'timestamp',
          currency STRING,
          rate DECIMAL(38, 10),
          WATERMARK FOR currency_time AS currency_time   -- defines the event time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'rates',
        'scan.startup.mode' = 'earliest-offset',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'                                -- this is an append only source
      );
      
      SELECT * from RatesHistory FOR SYSTEM_TIME AS OF TIMESTAMP '2020-11-11 13:12:13';
      

      I understand that might not be supported now, but the exception I got is not very helpful:

      org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
      Missing conversion is FlinkLogicalSnapshot[convention: LOGICAL -> STREAM_PHYSICAL]
      There is 1 empty subset: rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
      977:FlinkLogicalSnapshot(period=[2020-11-11 13:12:13])
        975:FlinkLogicalCalc(subset=[rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate])
          962:FlinkLogicalTableSourceScan(subset=[rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[currency, rate, timestamp])
      
      Root: rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
      Original rel:
      FlinkLogicalLegacySink(subset=[rel#117:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], name=[`default_catalog`.`default_database`.`_tmp_table_1885690557`], fields=[currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 127
        FlinkLogicalCalc(subset=[rel#126:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(currency_time) AS currency_time, currency, CAST(rate) AS rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 129
          FlinkLogicalWatermarkAssigner(subset=[rel#124:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], rowtime=[currency_time], watermark=[$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 123
            FlinkLogicalCalc(subset=[rel#122:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(timestamp) AS currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 128
              FlinkLogicalTableSourceScan(subset=[rel#120:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, timestamp]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 119
      
      Sets:
      Set#42, type: RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)
      	rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#962
      		rel#962:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
      	rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#983
      		rel#983:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
      Set#43, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)
      	rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#975
      		rel#975:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
      	rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#985
      		rel#985:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
      Set#44, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)
      	rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#977
      		rel#977:FlinkLogicalSnapshot.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#976,period=2020-11-11 13:12:13), rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}
      	rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
      Set#45, type: RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1)
      	rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#979
      		rel#979:FlinkLogicalLegacySink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}
      	rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
      		rel#982:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
      		rel#988:StreamExecLegacySink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={inf}
      
      Graphviz:
      digraph G {
      	root [style=filled,label="Root"];
      	subgraph cluster42{
      		label="Set 42 RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)";
      		rel962 [label="rel#962:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel983 [label="rel#983:StreamExecTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset974 [label="rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]"]
      		subset984 [label="rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
      	}
      	subgraph cluster43{
      		label="Set 43 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
      		rel975 [label="rel#975:FlinkLogicalCalc\ninput=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel985 [label="rel#985:StreamExecCalc\ninput=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset976 [label="rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]"]
      		subset986 [label="rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
      	}
      	subgraph cluster44{
      		label="Set 44 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
      		rel977 [label="rel#977:FlinkLogicalSnapshot\ninput=RelSubset#976,period=2020-11-11 13:12:13\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		subset978 [label="rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE]"]
      		subset987 [label="rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]",color=red]
      	}
      	subgraph cluster45{
      		label="Set 45 RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1)";
      		rel979 [label="rel#979:FlinkLogicalLegacySink\ninput=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
      		rel982 [label="rel#982:AbstractConverter\ninput=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
      		rel988 [label="rel#988:StreamExecLegacySink\ninput=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={inf}",shape=box]
      		subset980 [label="rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE]"]
      		subset981 [label="rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
      	}
      	root -> subset981;
      	subset974 -> rel962[color=blue];
      	subset984 -> rel983[color=blue];
      	subset976 -> rel975[color=blue]; rel975 -> subset974[color=blue];
      	subset986 -> rel985[color=blue]; rel985 -> subset984[color=blue];
      	subset978 -> rel977[color=blue]; rel977 -> subset976[color=blue];
      	subset980 -> rel979[color=blue]; rel979 -> subset978[color=blue];
      	subset981 -> rel982; rel982 -> subset980;
      	subset981 -> rel988; rel988 -> subset987;
      }
      

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              dwysakowicz Dawid Wysakowicz
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: