Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35602 [Umbrella] Test Flink Release 1.20
  3. FLINK-35609

Release Testing Instructions: Verify FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines

    XMLWordPrintableJSON

Details

    Description

      Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187

      Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end process, so the Release testing is an overall test of FLIP-435 & FLIP-448 feature at the same time.
      Since Materialized Table depends on CatalogStore, Catalog, Workflow Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the whole process, the validation process consists of two parts: Environment Setup and Feature Verification.

      Environment Setup:

      1. create the File CatalogStore directory
      2. Create the test-filesystem Catalog and put flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
      3. Create the Savepoint directory.
      4. Configure the Flink config.yaml file.

      #==============================================================================
      # Common
      #==============================================================================
      
      jobmanager: 
        bind-host: localhost
        rpc: 
          address: localhost
          # The RPC port where the JobManager is reachable.
          port: 6123
        memory: 
          process: 
            size: 1600m
        execution: 
          failover-strategy: region
      
      taskmanager: 
        bind-host: localhost
        host: localhost
        # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
        numberOfTaskSlots: 3
        memory: 
          process: 
            size: 1728m
      
      parallelism: 
        # The parallelism used for programs that did not specify and other parallelism.
        default: 1
      
      #==============================================================================
      # Rest & web frontend
      #==============================================================================
      
      rest: 
        # The address to which the REST client will connect to
        address: localhost
        bind-address: localhost
      
      # Catalog Store
      table: 
        catalog-store: 
          kind: file
          file: 
            path: xxx
      
      # Embedded Scheduler config
      workflow-scheduler: 
        type: embedded
      
      # SQL Gateway address
      sql-gateway: 
        endpoint: 
          rest: 
            address: 127.0.0.1
      

      5. Start the Standalone cluster: . /bin/start-cluster.sh
      6. Start the SQL Gateway: . /bin/sql-gateway.sh
      7. Start SQL Client: /bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
      8. Register the test-filesystem Catalog

      CREATE CATALOG mt_cat
      WITH (
        'type' = 'test-filesystem',
        'path' = '...',
        'default-database' = 'mydb'  
      );
      
      USE CATALOG mt_cat;
      

      9. Create the test-filesystem source table and insert the data

      -- 1. create json format table
      CREATE TABLE json_source (
        order_id BIGINT,
        user_id BIGINT,
        user_name STRING,
        order_created_at STRING,
        payment_amount_cents BIGINT
      ) WITH (
        'format' = 'json',
        'source.monitor-interval' = '5S'
      );
      
      -- 2. insert data
      INSERT INTO mt_cat.mydb.json_source VALUES
      (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
      (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
      (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
      (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
      (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
      (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
      (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
      (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
      
      INSERT INTO mt_cat.mydb.json_source VALUES
      (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
      (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
      (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
      (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
      (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
      (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
      (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
      (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
      

      Feature verification

      Continuous Mode

      In Continuous Mode, Materialized Table runs a Flink streaming job to update the data in real-time. Feature verify includes various scenarios such as Create & Suspend & Resume & Drop.

      1. Create Materialized Table, including various bad cases and good cases, and execute the following statement in the SQL Client

      CREATE MATERIALIZED TABLE continuous_users_shops 
      (
        PRIMARY KEY(id) NOT ENFORCED
      )
      WITH(
        'format' = 'debezium-json'
      )
      FRESHNESS = INTERVAL '30' SECOND
      AS SELECT 
        user_id,
        ds,
        SUM (payment_amount_cents) AS payed_buy_fee_sum,
        SUM (1) AS pv
      FROM (
          SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM json_source ) AS tmp
       GROUP BY (user_id, ds);
      

      2. Suspend Materialized Table and execute the following statement in the SQL Client

      ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
      

      3. Resume Materialized Table

      ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
      

      4. Manual Refresh Materialized Table

      ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH PARTITION(ds = '2024-06-25');
      

      5. Drop Materialized Table

      DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
      

      Full Mode

      In Full Mode, Materialized Table needs to rely on Workflow Scheduler to complete the periodic full refresh operation, so the main purpose is to verify the FLIP-448 function.

      1. Create Materialized Table, verify various good and bad cases, and execute the following statement

      CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
      PARTITIONED BY (ds)
      WITH(
        'format' = 'json'
      )
      FRESHNESS = INTERVAL '1' MINUTE
      REFRESH_MODE = FULL
      AS SELECT 
        user_id,
        ds,
        SUM (payment_amount_cents) AS payed_buy_fee_sum,
        SUM (1) AS pv
      FROM (
          SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
      GROUP BY (user_id, ds);
      

      2. Suspend Materialized Table by executing the following statement

      ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
      

      3. Resume Materialized Table and execute the following statement

      ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
      

      4. Drop Materialized Table and execute the following statement

      DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
      
      DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
      

      Attachments

        Issue Links

          Activity

            People

              lsy dalongliu
              fanrui Rui Fan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: