Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34285 [Umbrella] Test Flink Release 1.19
  3. FLINK-34298

Release Testing Instructions: Verify FLINK-33397 Support Configuring Different State TTLs using SQL Hint

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.19.0
    • 1.19.0
    • Table SQL / API
    • None

    Description

      This ticket describes how to verify FLINK-33397: Support Configuring Different State TTLs using SQL Hint.

       

      The verification steps are as follows.
      1. Start the standalone session cluster and sql client.
      2. Execute the following DDL statements.

      CREATE TABLE `default_catalog`.`default_database`.`Orders` (
        `order_id` INT,
        `line_order_id` INT
      ) WITH (
        'connector' = 'datagen', 
        'rows-per-second' = '5'
      
      ); 
      
      CREATE TABLE `default_catalog`.`default_database`.`LineOrders` (
        `line_order_id` INT,
        `ship_mode` STRING
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '5'
      );
      
      CREATE TABLE `default_catalog`.`default_database`.`OrdersShipInfo` (
        `order_id` INT,
        `line_order_id` INT,
        `ship_mode` STRING ) WITH (
        'connector' = 'print'
      ); 

      3. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied to join

      -- SET the pipeline level state TTL to 24h
      SET 'table.exec.state.ttl' = '24h';
      
      -- Configure different state TTL for join operator
      COMPILE PLAN '/path/to/join-plan.json' FOR
      INSERT INTO OrdersShipInfo 
      SELECT /*+STATE_TTL('a' = '2d', 'b' = '12h')*/ a.order_id, a.line_order_id, b.ship_mode 
      FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id;
      
      

      The generated JSON file should contain the following "state" JSON array for StreamJoin ExecNode.

      {
          "id" : 5,
          "type" : "stream-exec-join_1",
          "joinSpec" : {
            ...
          },
          "state" : [ {
            "index" : 0,
            "ttl" : "2 d",
            "name" : "leftState"
          }, {
            "index" : 1,
            "ttl" : "12 h",
            "name" : "rightState"
          } ],
          "inputProperties": [...],
          "outputType": ...,
          "description": ...
      }
      

      4. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied to group aggregate

      CREARE TABLE source_t (
          a INT,
          b BIGINT,
          c STRING
      ) WITH (
          'connector' = 'datagen',
          'rows-per-second' = '5'
      );
      
      CREARE TABLE sink_t (
          b BIGINT PRIMARY KEY NOT ENFORCED,
          cnt BIGINT,
          avg_a DOUBLE,
          min_c STRING
      ) WITH (
          'connector' = 'datagen',
          'rows-per-second' = '5'
      );
      COMPILE PLAN '/path/to/agg-plan.json' FOR
      INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '1s') */
      b, 
      COUNT(*) AS cnt,
      AVG(a) FILTER (WHERE a > 1) AS avg_a
      MIN(c) AS min_c 
      FROM source_t GROUP BY b
      
      

       
      The generated JSON file should contain the following "state" JSON array for StreamExecGroupAggregate ExecNode.

      "state" : [ {
        "index" : 0,
        "ttl" : "1 s",
        "name" : "groupAggregateState"
      } ]
      
      

      Attachments

        Issue Links

          Activity

            People

              qingyue Jane Chan
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: