Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
Description
Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187
Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end process, so the Release testing is an overall test of FLIP-435 & FLIP-448 feature at the same time.
Since Materialized Table depends on CatalogStore, Catalog, Workflow Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the whole process, the validation process consists of two parts: Environment Setup and Feature Verification.
Environment Setup:
1. create the File CatalogStore directory
2. Create the test-filesystem Catalog and put flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
3. Create the Savepoint directory.
4. Configure the Flink config.yaml file.
#============================================================================== # Common #============================================================================== jobmanager: bind-host: localhost rpc: address: localhost # The RPC port where the JobManager is reachable. port: 6123 memory: process: size: 1600m execution: failover-strategy: region taskmanager: bind-host: localhost host: localhost # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. numberOfTaskSlots: 3 memory: process: size: 1728m parallelism: # The parallelism used for programs that did not specify and other parallelism. default: 1 #============================================================================== # Rest & web frontend #============================================================================== rest: # The address to which the REST client will connect to address: localhost bind-address: localhost # Catalog Store table: catalog-store: kind: file file: path: xxx # Embedded Scheduler config workflow-scheduler: type: embedded # SQL Gateway address sql-gateway: endpoint: rest: address: 127.0.0.1
5. Start the Standalone cluster: . /bin/start-cluster.sh
6. Start the SQL Gateway: . /bin/sql-gateway.sh
7. Start SQL Client: /bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
8. Register the test-filesystem Catalog
CREATE CATALOG mt_cat WITH ( 'type' = 'test-filesystem', 'path' = '...', 'default-database' = 'mydb' ); USE CATALOG mt_cat;
9. Create the test-filesystem source table and insert the data
-- 1. create json format table CREATE TABLE json_source ( order_id BIGINT, user_id BIGINT, user_name STRING, order_created_at STRING, payment_amount_cents BIGINT ) WITH ( 'format' = 'json', 'source.monitor-interval' = '5S' ); -- 2. insert data INSERT INTO mt_cat.mydb.json_source VALUES (1001, 1, 'user1', '2024-06-24 10:00:00', 10), (1002, 1, 'user2', '2024-06-24 10:01:00', 20), (1003, 2, 'user3', '2024-06-24 10:02:00', 30), (1004, 2, 'user4', '2024-06-24 10:03:00', 40), (1005, 1, 'user1', '2024-06-25 10:00:00', 10), (1006, 1, 'user2', '2024-06-25 10:01:00', 20), (1007, 2, 'user3', '2024-06-25 10:02:00', 30), (1008, 2, 'user4', '2024-06-25 10:03:00', 40); INSERT INTO mt_cat.mydb.json_source VALUES (1001, 1, 'user1', '2024-06-26 10:00:00', 10), (1002, 1, 'user2', '2024-06-26 10:01:00', 20), (1003, 2, 'user3', '2024-06-26 10:02:00', 30), (1004, 2, 'user4', '2024-06-26 10:03:00', 40), (1005, 1, 'user1', '2024-06-27 10:00:00', 10), (1006, 1, 'user2', '2024-06-27 10:01:00', 20), (1007, 2, 'user3', '2024-06-27 10:02:00', 30), (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
Feature verification
Continuous Mode
In Continuous Mode, Materialized Table runs a Flink streaming job to update the data in real-time. Feature verify includes various scenarios such as Create & Suspend & Resume & Drop.
1. Create Materialized Table, including various bad cases and good cases, and execute the following statement in the SQL Client
CREATE MATERIALIZED TABLE continuous_users_shops ( PRIMARY KEY(id) NOT ENFORCED ) WITH( 'format' = 'debezium-json' ) FRESHNESS = INTERVAL '30' SECOND AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS pv FROM ( SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM json_source ) AS tmp GROUP BY (user_id, ds);
2. Suspend Materialized Table and execute the following statement in the SQL Client
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
3. Resume Materialized Table
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
4. Manual Refresh Materialized Table
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH PARTITION(ds = '2024-06-25');
5. Drop Materialized Table
DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
Full Mode
In Full Mode, Materialized Table needs to rely on Workflow Scheduler to complete the periodic full refresh operation, so the main purpose is to verify the FLIP-448 function.
1. Create Materialized Table, verify various good and bad cases, and execute the following statement
CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops PARTITIONED BY (ds) WITH( 'format' = 'json' ) FRESHNESS = INTERVAL '1' MINUTE REFRESH_MODE = FULL AS SELECT user_id, ds, SUM (payment_amount_cents) AS payed_buy_fee_sum, SUM (1) AS pv FROM ( SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp GROUP BY (user_id, ds);
2. Suspend Materialized Table by executing the following statement
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
3. Resume Materialized Table and execute the following statement
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
4. Drop Materialized Table and execute the following statement
DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops; DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
Attachments
Issue Links
- is cloned by
-
FLINK-35689 Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized Table for Simplifying Data Pipelines
- Resolved