Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30989

Configuration table.exec.spill-compression.block-size not take effect in batch job

    XMLWordPrintableJSON

Details

    Description

      Description

      I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I  attached to TaskManager and found conf passed to constructor of BinaryExternalSorter is empty:

      How to reproduce

      A simple code to reproduce this problem:

      // App.java
      
      package test.flink403;
      
      import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
      import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
      
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.configuration.AlgorithmOptions;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.api.config.ExecutionConfigOptions;
      
      import java.util.Arrays; public class App {
      
        public static void main(String argc[]) throws Exception {
      
          Configuration config = new Configuration();
          config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
          config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true);
          config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
          config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect
          config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
      
          final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
          tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect
          final DataStream<Order> orderA =
              env.fromCollection(
                  Arrays.asList(
                      new Order(1L, "beer", 3),
                      new Order(1L, "diaper", 4),
                      new Order(3L, "rubber", 2)));
      
          final Table tableA = tableEnv.fromDataStream(orderA);
      
          final Table result =
              tableEnv.sqlQuery(
                  "SELECT * FROM "
                      + tableA
                      + " "
                      + " order by user");
      
          tableEnv.toDataStream(result, Order.class).print();
          env.execute();
        }
      }
      
      // ---------------------------------------------------------------
      // Order.java
      package test.flink403;
      
      public class Order {
        public Long user;
        public String product;
        public int amount;
      
        // for POJO detection in DataStream API
        public Order() {}
      
        // for structured type detection in Table API
        public Order(Long user, String product, int amount) {
          this.user = user;
          this.product = product;
          this.amount = amount;
        }
      
        @Override
        public String toString() {
          return "Order{"
              + "user="
              + user
              + ", product='"
              + product
              + '\''
              + ", amount="
              + amount
              + '}';
        }
      }

       

      I think it is because SortOperator try to get conf from JobConfiguration, which should be set in JobGraph.
      Following are the Classes use the same method to get conf from JobConfiguration:

      • BinaryExternalSorter
        • ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
        • ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
        • ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
        • ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
      • BinaryHashTable,BaseHybridHashTable
        • ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
        • ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
      • SortDataInput
        • AlgorithmOptions.SORT_SPILLING_THRESHOLD
        • AlgorithmOptions.SPILLING_MAX_FAN
        • AlgorithmOptions.USE_LARGE_RECORDS_HANDLER

      Attachments

        Activity

          People

            lsy dalongliu
            shenjiaqi shen
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: