Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9166

Performance issue with many topologies in a single job

    XMLWordPrintableJSON

Details

    Description

      With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster.

      • JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?).
      • The same works as standalone java program locally (high CPU initially)
      • Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message.
      • In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 (partitions in our Kafka source).

      Query:

       select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source 
       from structStream
       where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'
       group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source
      

      Code:

      public static void main(String[] args) throws Exception {
       FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>());
      
       final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
       final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);
      
       final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
       tableEnv.registerDataStream("structStream", structStream);
       tableEnv.scan("structStream").printSchema();
      
       for (int i = 0; i < 100; i++){
         for (String query : Queries.sample){
           // Queries.sample has one query that is above. 
           Table selectQuery = tableEnv.sqlQuery(query);
      
           DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,  Row.class);
           selectQueryStream.print();
         }
       }
      
       // execute program
       streamingEnvironment.execute("Kafka Streaming SQL");
      }
      
      private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
        Properties properties = getKafkaProperties();
        // TestDeserializer deserializes the JSON to a ROW of string columns (515)
        // and also adds a column for the raw message. 
        FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new            TestDeserializer(getRowTypeInfo()), properties);
        DataStream<Row> stream = environment.addSource(consumer);
      
        return stream;
      }
      
      private static RowTypeInfo getRowTypeInfo() throws Exception {
        // This has 515 fields. 
        List<String> fieldNames = DDIManager.getDDIFieldNames();
        fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
        fieldNames.add("proctime");
      
       // Fill typeInformationArray with StringType to all but the last field which is of type Time
        .....
        return new RowTypeInfo(typeInformationArray, fieldNamesArray);
      }
      
      private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
        final StreamExecutionEnvironment env =      StreamExecutionEnvironment.getExecutionEnvironment(); 
         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
      
         env.enableCheckpointing(60000);
         env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
         env.setParallelism(725);
         return env;
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            ssubbu@gmail.com SUBRAMANYA SURESH
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: