Status: Closed
Resolution: Fixed
This ticket describes how to verify FLINK-33397: Support Configuring Different State TTLs using SQL Hint.
The verification steps are as follows.
1. Start the standalone session cluster and sql client.
2. Execute the following DDL statements.
CREATE TABLE `default_catalog`.`default_database`.`Orders` ( `order_id` INT, `line_order_id` INT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE `default_catalog`.`default_database`.`LineOrders` ( `line_order_id` INT, `ship_mode` STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE `default_catalog`.`default_database`.`OrdersShipInfo` ( `order_id` INT, `line_order_id` INT, `ship_mode` STRING ) WITH ( 'connector' = 'print' );
3. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied to join
-- SET the pipeline level state TTL to 24h SET 'table.exec.state.ttl' = '24h'; -- Configure different state TTL for join operator COMPILE PLAN '/path/to/join-plan.json' FOR INSERT INTO OrdersShipInfo SELECT /*+STATE_TTL('a' = '2d', 'b' = '12h')*/ a.order_id, a.line_order_id, b.ship_mode FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id;
The generated JSON file should contain the following "state" JSON array for StreamJoin ExecNode.
{ "id" : 5, "type" : "stream-exec-join_1", "joinSpec" : { ... }, "state" : [ { "index" : 0, "ttl" : "2 d", "name" : "leftState" }, { "index" : 1, "ttl" : "12 h", "name" : "rightState" } ], "inputProperties": [...], "outputType": ..., "description": ... }
4. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied to group aggregate
CREARE TABLE source_t ( a INT, b BIGINT, c STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREARE TABLE sink_t ( b BIGINT PRIMARY KEY NOT ENFORCED, cnt BIGINT, avg_a DOUBLE, min_c STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); COMPILE PLAN '/path/to/agg-plan.json' FOR INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '1s') */ b, COUNT(*) AS cnt, AVG(a) FILTER (WHERE a > 1) AS avg_a MIN(c) AS min_c FROM source_t GROUP BY b
The generated JSON file should contain the following "state" JSON array for StreamExecGroupAggregate ExecNode.
"state" : [ { "index" : 0, "ttl" : "1 s", "name" : "groupAggregateState" } ]
Issue Links
- is a clone of
FLINK-34297 Release Testing Instructions: Verify FLINK-34079 Migrate string configuration key to ConfigOption
- Closed
- is cloned by
FLINK-34299 Release Testing Instructions: Verify FLINK-33203 Adding a separate configuration for specifying Java Options of the SQL Gateway
- Closed
FLINK-34300 Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
- Closed