Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27922

Flink SQL unique key lost when set table.exec.mini-batch.enabled=true

    XMLWordPrintableJSON

Details

    Description

      Flink SQL table has primary keys, but when set table.exec.mini-batch.enabled =true, the unique key is lost.

      @Test
      public void testJoinUniqueKey() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);    Configuration configuration = tableEnv.getConfig().getConfiguration();
          configuration.setString("table.exec.mini-batch.enabled", "true");
          configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
          configuration.setString("table.exec.mini-batch.size", "5000");    StatementSet statementSet = tableEnv.createStatementSet();
          tableEnv.executeSql("CREATE TABLE `t_apply_sku_test`(`dt` BIGINT,`refund_apply_id` BIGINT,`base_sku_id` BIGINT,`order_id` BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`refund_type` BIGINT,`apply_refund_reason_code` BIGINT,`apply_refund_reason_desc` VARCHAR,`apply_refund_review_status` BIGINT,`apply_refund_review_status_desc` VARCHAR,`apply_refund_reject_reason` VARCHAR,`apply_is_refunded` INTEGER,`apply_pic_url` VARCHAR,`remark` VARCHAR,`refund_apply_originator` BIGINT,`second_reason_code` BIGINT,`second_reason` VARCHAR,`refund_target_account` BIGINT,`after_service_id` BIGINT,`receipt_status` BIGINT,`group_header_goods_status` INTEGER,`apply_operator_mis_name` VARCHAR,`refund_apply_time` BIGINT,`update_time` BIGINT,`base_sku_name` VARCHAR,`apply_refund_num` BIGINT,`view_qty` DECIMAL(38,18),`refund_scale_type` BIGINT,`refund_scale_type_desc` VARCHAR,`refund_scale` DECIMAL(38,18),`apply_refund_amt` DECIMAL(38,18),`refund_scale_user_real_pay` DECIMAL(38,18),`refund_red_packet_price` DECIMAL(38,18),`load_time` VARCHAR,`take_rate_type` BIGINT,`platform_rate` DECIMAL(38,18),`order_sku_type` INTEGER,`second_reason_aggregated_code` INTEGER,`second_reason_aggregated` VARCHAR,`compensation_amount` DECIMAL(38,18),`aftersale_type` INTEGER,`group_header_parallel_status` INTEGER,`grid_parallel_status` INTEGER) WITH ('connector'='blackhole')");
          tableEnv.executeSql("CREATE TABLE `t_name`(`id` BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`city_id` BIGINT,`refund_type` INTEGER,`first_reason_code` INTEGER,`first_reason` VARCHAR,`second_reason_code` INTEGER,`second_reason` VARCHAR,`pic_url` VARCHAR,`remark` VARCHAR,`refund_price` INTEGER,`refund_red_packet_price` INTEGER,`refund_total_price` INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` INTEGER,`refund_other_price` INTEGER,`user_receipt_status` INTEGER,`collect_status` INTEGER,`refund_target_account` INTEGER,`status` INTEGER,`flow_instance_id` BIGINT,`create_time` BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
          tableEnv.executeSql("CREATE TABLE `t_item_name`(`id` BIGINT,`refund_fwd_item_id` BIGINT,`after_service_id` BIGINT,`order_id` BIGINT,`order_item_id` BIGINT,`stack_id` BIGINT,`sku_id` BIGINT,`sku_name` VARCHAR,`supplier_id` BIGINT,`refund_quantity` INTEGER,`item_sku_type` INTEGER,`refund_scale_type` INTEGER,`refund_scale` INTEGER,`accurate_refund` INTEGER,`refund_price` INTEGER,`refund_red_packet_price` INTEGER,`refund_price_info` VARCHAR,`refund_total_price` INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price` INTEGER,`refund_other_price` INTEGER,`extend_info` VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
          tableEnv.executeSql("CREATE TABLE `t_progress_name`(`id` BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`progress_node` VARCHAR,`progress_node_status` INTEGER,`operator` VARCHAR,`parallel` INTEGER,`flow_element_id` VARCHAR,`extend_info` VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
          tableEnv.executeSql("CREATE TABLE `t_attr_name`(`id` BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`name` VARCHAR,`value` VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
          tableEnv.executeSql("CREATE TABLE `t_apply_status_name`(`id` BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`apply_status` INTEGER,`ai_audit_status` INTEGER,`group_header_confirm_status` INTEGER,`group_header_retrieve_status` INTEGER,`driver_retrieve_status` INTEGER,`group_header_parallel_status` INTEGER,`grid_parallel_status` INTEGER,`create_time` BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
          tableEnv.executeSql("CREATE VIEW `v_refund_fwd_item_sku` AS SELECT `after_service_id` AS `refund_apply_id`, `order_id`, `sku_id` AS `base_sku_id`, SUM(`if`(`refund_quantity` IS NULL, 0, `refund_quantity`)) AS `apply_refund_num`, SUM(`if`(`refund_quantity` IS NULL, 0, CAST(`refund_quantity` AS DOUBLE))) AS `view_qty`, SUM(`if`(`refund_price` IS NULL, 0, `refund_price`)) AS `apply_refund_amt`, SUM(`if`(`refund_price` IS NULL, 0, `refund_price`)) AS `refund_scale_user_real_pay`, SUM(`if`(`refund_red_packet_price` IS NULL, 0, `refund_red_packet_price`)) AS `refund_red_packet_price`\n"
                  + "FROM `t_item_name`\n"
                  + "GROUP BY `after_service_id`, `order_id`, `sku_id`");    statementSet.addInsertSql("INSERT INTO `t_apply_sku_test` SELECT CAST(`FROM_UNIXTIME`(`ord`.`modify_time` / 1000, 'yyyyMMdd') AS BIGINT) AS `dt`, `sku`.`refund_apply_id`,`sku`.`base_sku_id`, `sku`.`order_id`, `ord`.`user_id`, `ord`.`poi_id`, CAST(`ord`.`refund_type` AS BIGINT) AS `refund_type`, CAST(`ord`.`first_reason_code` AS BIGINT) AS `apply_refund_reason_code`, `ord`.`first_reason` AS `apply_refund_reason_desc`, CAST(`stat`.`apply_status` AS BIGINT) AS `apply_refund_review_status`, CASE WHEN `stat`.`apply_status` = 0 OR `stat`.`apply_status` = 13 THEN 'a' WHEN `stat`.`apply_status` = 1 THEN 'b' WHEN `stat`.`apply_status` = 2 THEN 'c' WHEN `stat`.`apply_status` = 3 THEN 'd' WHEN `stat`.`apply_status` = 4 THEN 'e' WHEN `stat`.`apply_status` = 5 THEN 'f' WHEN `stat`.`apply_status` = 6 THEN 'g' ELSE 'x' END AS `apply_refund_review_status_desc`, `if`(`progress`.`progress_node_status` = 30, `extend_info`, '') AS `apply_refund_reject_reason`, CASE WHEN `progress`.`progress_node` = 'refund.node' THEN 1 ELSE 0 END AS `apply_is_refunded`, `ord`.`pic_url` AS `apply_pic_url`, `ord`.`remark`, CAST(NULL AS BIGINT) AS `refund_apply_originator`, CAST(`ord`.`second_reason_code` AS BIGINT) AS `second_reason_code`, `ord`.`second_reason`, CAST(`ord`.`refund_target_account` AS BIGINT) AS `refund_target_account`, `ord`.`after_service_id`, CAST(`ord`.`user_receipt_status` AS BIGINT) AS `receipt_status`, CASE WHEN `attr`.`name` = 'fwd_ext' AND `attr`.`value` = '1' THEN 1 WHEN `attr`.`name` = 'fwd_ext' AND `attr`.`value` = '2' THEN 2 ELSE 0 END AS `group_header_goods_status`, `progress`.`operator` AS `apply_operator_mis_name`, `ord`.`create_time` AS `refund_apply_time`, `ord`.`modify_time` AS `update_time`, CAST(NULL AS VARCHAR) AS `base_sku_name`, CAST(`sku`.`apply_refund_num` AS BIGINT) AS `apply_refund_num`, `sku`.`view_qty` AS `view_qty`, CAST(NULL AS BIGINT) AS `refund_scale_type`, CAST(NULL AS VARCHAR) AS `refund_scale_type_desc`, CAST(NULL AS DECIMAL) AS `refund_scale`, `sku`.`apply_refund_amt` AS `apply_refund_amt`, `sku`.`refund_scale_user_real_pay` AS `refund_scale_user_real_pay`, `sku`.`refund_red_packet_price` AS `refund_red_packet_price`, CAST(LOCALTIMESTAMP AS VARCHAR) AS `load_time`, CAST(NULL AS BIGINT) AS `take_rate_type`, CAST(NULL AS DECIMAL) AS `platform_rate`, 1 AS `order_sku_type`, CAST(NULL AS INTEGER) AS `second_reason_aggregated_code`, CAST(NULL AS VARCHAR) AS `second_reason_aggregated`, CAST(NULL AS DECIMAL) AS `compensation_amount`, 1 AS `aftersale_type`, CASE WHEN `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20 THEN 2 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50 THEN 5 ELSE 0 END AS `group_header_parallel_status`, CASE WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20 THEN 2 WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50 THEN 5 ELSE 0 END AS `grid_parallel_status`\n"
                  + "FROM `t_name` AS `ord`\n"
                  + "LEFT JOIN `t_progress_name` AS `progress` ON `ord`.`after_service_id` = `progress`.`after_service_id`\n"
                  + "LEFT JOIN `t_attr_name` AS `attr` ON `ord`.`after_service_id` = `attr`.`after_service_id`\n"
                  + "LEFT JOIN `t_apply_status_name` AS `stat` ON `ord`.`after_service_id` = `stat`.`after_service_id`\n"
                  + "LEFT JOIN `v_refund_fwd_item_sku` AS `sku` ON `stat`.`after_service_id` = `sku`.`refund_apply_id`");    System.out.println(statementSet.explain());
      }

      == Optimized Logical Plan ==

      Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status, apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, refund_apply_originator, second_reason_code, second_reason, refund_target_account, after_service_id, receipt_status, group_header_goods_status, apply_operator_mis_name, refund_apply_time, update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, refund_scale_type_desc, refund_scale, apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, second_reason_aggregated, compensation_amount, aftersale_type, group_header_parallel_status, grid_parallel_status])
      +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS apply_refund_reason_code, first_reason AS apply_refund_reason_desc, CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS refund_apply_originator, CAST(second_reason_code) AS second_reason_code, second_reason, CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, operator AS apply_operator_mis_name, create_time AS refund_apply_time, modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type, null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS compensation_amount, 1 AS aftersale_type, CAST((((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS group_header_parallel_status, CAST((((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS grid_parallel_status])
         +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id, apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
            :- Exchange(distribution=[hash[after_service_id0]])
            :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value, after_service_id0, apply_status], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :     :- Exchange(distribution=[hash[after_service_id]])
            :     :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value])
            :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, after_service_id0, name, value], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :     :        :- Exchange(distribution=[hash[after_service_id]])
            :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info])
            :     :        :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, after_service_id0, progress_node, progress_node_status, operator, parallel, extend_info], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :     :        :        :- Exchange(distribution=[hash[after_service_id]])
            :     :        :        :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time])
            :     :        :        :     +- DropUpdateBefore
            :     :        :        :        +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            :     :        :        :           +- TableSourceScan(table=[[default_catalog, default_database, t_name]], fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, refund_price, refund_red_packet_price, refund_total_price, refund_promotion_price, refund_coupon_price, refund_other_price, user_receipt_status, collect_status, refund_target_account, status, flow_instance_id, create_time, modify_time])
            :     :        :        +- Exchange(distribution=[hash[after_service_id]])
            :     :        :           +- Calc(select=[after_service_id, progress_node, progress_node_status, operator, parallel, extend_info])
            :     :        :              +- DropUpdateBefore
            :     :        :                 +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            :     :        :                    +- TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], fields=[id, after_service_id, order_id, progress_node, progress_node_status, operator, parallel, flow_element_id, extend_info, create_time, modify_time])
            :     :        +- Exchange(distribution=[hash[after_service_id]])
            :     :           +- Calc(select=[after_service_id, name, value])
            :     :              +- DropUpdateBefore
            :     :                 +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            :     :                    +- TableSourceScan(table=[[default_catalog, default_database, t_attr_name]], fields=[id, after_service_id, order_id, name, value, create_time, modify_time])
            :     +- Exchange(distribution=[hash[after_service_id]])
            :        +- Calc(select=[after_service_id, apply_status])
            :           +- DropUpdateBefore
            :              +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
            :                 +- TableSourceScan(table=[[default_catalog, default_database, t_apply_status_name]], fields=[id, after_service_id, order_id, apply_status, ai_audit_status, group_header_confirm_status, group_header_retrieve_status, driver_retrieve_status, group_header_parallel_status, grid_parallel_status, create_time, modify_time])
            +- Exchange(distribution=[hash[refund_apply_id]])
               +- Calc(select=[refund_apply_id, order_id, base_sku_id, apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price])
                  +- GlobalGroupAggregate(groupBy=[refund_apply_id, order_id, base_sku_id], select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT((sum$0, count$1)) AS apply_refund_num, SUM_RETRACT((sum$2, count$3)) AS view_qty, SUM_RETRACT((sum$4, count$5)) AS refund_scale_user_real_pay, SUM_RETRACT((sum$6, count$7)) AS refund_red_packet_price])
                     +- Exchange(distribution=[hash[refund_apply_id, order_id, base_sku_id]])
                        +- LocalGroupAggregate(groupBy=[refund_apply_id, order_id, base_sku_id], select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3) AS (sum$0, count$1), SUM_RETRACT($f4) AS (sum$2, count$3), SUM_RETRACT($f5) AS (sum$4, count$5), SUM_RETRACT($f6) AS (sum$6, count$7), COUNT_RETRACT(*) AS count1$8])
                           +- Calc(select=[after_service_id AS refund_apply_id, order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
                              +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
                                 +- TableSourceScan(table=[[default_catalog, default_database, t_item_name]], fields=[id, refund_fwd_item_id, after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, accurate_refund, refund_price, refund_red_packet_price, refund_price_info, refund_total_price, refund_promotion_price, refund_coupon_price, refund_other_price, extend_info, create_time, modify_time]) 

      when set table.exec.mini-batch.enabled=false

      == Optimized Logical Plan ==

      Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type, apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status, apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded, apply_pic_url, remark, refund_apply_originator, second_reason_code, second_reason, refund_target_account, after_service_id, receipt_status, group_header_goods_status, apply_operator_mis_name, refund_apply_time, update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type, refund_scale_type_desc, refund_scale, apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type, platform_rate, order_sku_type, second_reason_aggregated_code, second_reason_aggregated, compensation_amount, aftersale_type, group_header_parallel_status, grid_parallel_status])
      +- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd')) AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id, CAST(refund_type) AS refund_type, CAST(first_reason_code) AS apply_refund_reason_code, first_reason AS apply_refund_reason_desc, CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE (apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd' CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE _UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF _UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node = _UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1 CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS refund_apply_originator, CAST(second_reason_code) AS second_reason_code, second_reason, CAST(refund_target_account) AS refund_target_account, CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status, operator AS apply_operator_mis_name, create_time AS refund_apply_time, modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale, CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS refund_scale_user_real_pay, CAST(refund_red_packet_price) AS refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type, null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS compensation_amount, 1 AS aftersale_type, CAST((((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS group_header_parallel_status, CAST((((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node = _UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS grid_parallel_status])
         +- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 = refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id, apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
            :- Exchange(distribution=[hash[after_service_id0]])
            :  +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value, after_service_id0, apply_status], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
            :     :- Exchange(distribution=[hash[after_service_id]])
            :     :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, name, value])
            :     :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info, after_service_id0, name, value], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
            :     :        :- Exchange(distribution=[hash[after_service_id]])
            :     :        :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, progress_node, progress_node_status, operator, parallel, extend_info])
            :     :        :     +- Join(joinType=[LeftOuterJoin], where=[(after_service_id = after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time, after_service_id0, progress_node, progress_node_status, operator, parallel, extend_info], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
            :     :        :        :- Exchange(distribution=[hash[after_service_id]])
            :     :        :        :  +- Calc(select=[after_service_id, user_id, poi_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, user_receipt_status, refund_target_account, create_time, modify_time])
            :     :        :        :     +- DropUpdateBefore
            :     :        :        :        +- TableSourceScan(table=[[default_catalog, default_database, t_name]], fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type, first_reason_code, first_reason, second_reason_code, second_reason, pic_url, remark, refund_price, refund_red_packet_price, refund_total_price, refund_promotion_price, refund_coupon_price, refund_other_price, user_receipt_status, collect_status, refund_target_account, status, flow_instance_id, create_time, modify_time])
            :     :        :        +- Exchange(distribution=[hash[after_service_id]])
            :     :        :           +- Calc(select=[after_service_id, progress_node, progress_node_status, operator, parallel, extend_info])
            :     :        :              +- DropUpdateBefore
            :     :        :                 +- TableSourceScan(table=[[default_catalog, default_database, t_progress_name]], fields=[id, after_service_id, order_id, progress_node, progress_node_status, operator, parallel, flow_element_id, extend_info, create_time, modify_time])
            :     :        +- Exchange(distribution=[hash[after_service_id]])
            :     :           +- Calc(select=[after_service_id, name, value])
            :     :              +- DropUpdateBefore
            :     :                 +- TableSourceScan(table=[[default_catalog, default_database, t_attr_name]], fields=[id, after_service_id, order_id, name, value, create_time, modify_time])
            :     +- Exchange(distribution=[hash[after_service_id]])
            :        +- Calc(select=[after_service_id, apply_status])
            :           +- DropUpdateBefore
            :              +- TableSourceScan(table=[[default_catalog, default_database, t_apply_status_name]], fields=[id, after_service_id, order_id, apply_status, ai_audit_status, group_header_confirm_status, group_header_retrieve_status, driver_retrieve_status, group_header_parallel_status, grid_parallel_status, create_time, modify_time])
            +- Exchange(distribution=[hash[refund_apply_id]])
               +- Calc(select=[refund_apply_id, order_id, base_sku_id, apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt, refund_scale_user_real_pay, refund_red_packet_price])
                  +- GroupAggregate(groupBy=[refund_apply_id, order_id, base_sku_id], select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3) AS apply_refund_num, SUM_RETRACT($f4) AS view_qty, SUM_RETRACT($f5) AS refund_scale_user_real_pay, SUM_RETRACT($f6) AS refund_red_packet_price])
                     +- Exchange(distribution=[hash[refund_apply_id, order_id, base_sku_id]])
                        +- Calc(select=[after_service_id AS refund_apply_id, order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS $f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
                           +- TableSourceScan(table=[[default_catalog, default_database, t_item_name]], fields=[id, refund_fwd_item_id, after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name, supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale, accurate_refund, refund_price, refund_red_packet_price, refund_price_info, refund_total_price, refund_promotion_price, refund_coupon_price, refund_other_price, extend_info, create_time, modify_time]) 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhangbinzaifendou zhangbin
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated: