Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18603

SQL fails with java.lang.IllegalStateException: No operators defined in streaming topology

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.11.0
    • None
    • Table SQL / Runtime
    • None

    Description

      Hi, was playing with 1.11 and found that code that worked in 1.10.1 fails in 1.11.0 with :

      Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
      	at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
      	at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
      	at com.css.flink.avro.confluent.table.SqlTest.main(SqlTest.java:53)
      
      

      code example:

      StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      EnvironmentSettings bsSettings =
          EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
      
      String createTable =
          String.format(
              "create table EnrichedOrders ("
                  + "name VARCHAR,"
                  + "proctime AS PROCTIME()"
                  + ") with ("
                  + "  'connector.type' = 'kafka',"
                  + "  'connector.version' = 'universal',"
                  + "  'connector.property-version' = '1',"
                  + "  'connector.topic' = '%s',"
                  + "  'connector.properties.bootstrap.servers' = '%s',"
                  + "  'connector.properties.group.id' = '%s',"
                  + "  'connector.startup-mode' = 'earliest-offset',"
                  + "  'update-mode' = 'append',"
                  + "  'format.type' = 'confluent-avro',"
                  + "  'format.schema-registry' = '%s'"
                  + ")",
              "avro",
              "broker",
              "testSqlLocal",
              "registry");
      
      tEnv.executeSql(createTable);
      
      tEnv.toAppendStream(
              tEnv.sqlQuery(
                  "select name, sum(*) "
                      + "from EnrichedOrders "
                      + "GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE), name"),
              Row.class)
          .print();
      
      tEnv.execute("testSql");
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            afilipchik Alexander Filipchik
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: