Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.11.0
-
None
-
None
Description
Hi, was playing with 1.11 and found that code that worked in 1.10.1 fails in 1.11.0 with :
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.css.flink.avro.confluent.table.SqlTest.main(SqlTest.java:53)
code example:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String createTable = String.format( "create table EnrichedOrders (" + "name VARCHAR," + "proctime AS PROCTIME()" + ") with (" + " 'connector.type' = 'kafka'," + " 'connector.version' = 'universal'," + " 'connector.property-version' = '1'," + " 'connector.topic' = '%s'," + " 'connector.properties.bootstrap.servers' = '%s'," + " 'connector.properties.group.id' = '%s'," + " 'connector.startup-mode' = 'earliest-offset'," + " 'update-mode' = 'append'," + " 'format.type' = 'confluent-avro'," + " 'format.schema-registry' = '%s'" + ")", "avro", "broker", "testSqlLocal", "registry"); tEnv.executeSql(createTable); tEnv.toAppendStream( tEnv.sqlQuery( "select name, sum(*) " + "from EnrichedOrders " + "GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE), name"), Row.class) .print(); tEnv.execute("testSql");