Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.1
Description
Description
I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I attached to TaskManager and found conf passed to constructor of BinaryExternalSorter is empty:
How to reproduce
A simple code to reproduce this problem:
// App.java package test.flink403; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import java.util.Arrays; public class App { public static void main(String argc[]) throws Exception { Configuration config = new Configuration(); config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true); config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect final DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final Table tableA = tableEnv.fromDataStream(orderA); final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " " + " order by user"); tableEnv.toDataStream(result, Order.class).print(); env.execute(); } } // --------------------------------------------------------------- // Order.java package test.flink403; public class Order { public Long user; public String product; public int amount; // for POJO detection in DataStream API public Order() {} // for structured type detection in Table API public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } }
I think it is because SortOperator try to get conf from JobConfiguration, which should be set in JobGraph.
Following are the Classes use the same method to get conf from JobConfiguration:
- BinaryExternalSorter
- ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
- ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
- BinaryHashTable,BaseHybridHashTable
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
- SortDataInput
- AlgorithmOptions.SORT_SPILLING_THRESHOLD
- AlgorithmOptions.SPILLING_MAX_FAN
- AlgorithmOptions.USE_LARGE_RECORDS_HANDLER