Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.11.1
Description
The following Table API streaming job is stuck when enabling mini batching
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // disable mini-batching completely to get a result Configuration tableConf = tableEnv.getConfig() .getConfiguration(); tableConf.setString("table.exec.mini-batch.enabled", "true"); tableConf.setString("table.exec.mini-batch.allow-latency", "5 s"); tableConf.setString("table.exec.mini-batch.size", "5000"); tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); tableEnv.executeSql( "CREATE TABLE input_table (" + "location STRING, " + "population INT" + ") WITH (" + "'connector' = 'kafka', " + "'topic' = 'kafka_batching_input', " + "'properties.bootstrap.servers' = 'localhost:9092', " + "'format' = 'csv', " + "'scan.startup.mode' = 'earliest-offset'" + ")"); tableEnv.executeSql( "CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)"); tableEnv .from("input_table") .groupBy($("location")) .select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population")) .executeInsert("result_table");
I am using a pre-populated Kafka topic called kafka_batching_input with these elements:
"Berlin",1 "Berlin",2
Attachments
Issue Links
- links to