Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29134

fetch metrics may cause oom(ThreadPool task pile up)

    XMLWordPrintableJSON

Details

    Description

      When we queryMetrics we use thread pool to process the data which are returned by TMs. 

      private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
          LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
      
          queryServiceGateway
                  .queryMetrics(timeout)
                  .whenCompleteAsync(
                          (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
                              if (t != null) {
                                  LOG.debug("Fetching metrics failed.", t);
                              } else {
                                  metrics.addAll(deserializer.deserialize(result));
                              }
                          },
                          executor);
      } 

      The only condition we will fetch metrics is update time is larger than updateInterval

      public void update() {
          synchronized (this) {
              long currentTime = System.currentTimeMillis();
              if (currentTime - lastUpdateTime > updateInterval) {
                  lastUpdateTime = currentTime;
                  fetchMetrics();
              }
          }
      } 

      Therefore, if we could not process the data in update-interval-time, metrics data will accumulate.

      Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool.

      When we open ui, it maybe even worse.

      final ScheduledExecutorService executor =
              WebMonitorEndpoint.createExecutorService(
                      configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                      configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                      "DispatcherRestEndpoint");
      
      final long updateInterval =
              configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
      final MetricFetcher metricFetcher =
              updateInterval == 0
                      ? VoidMetricFetcher.INSTANCE
                      : MetricFetcherImpl.fromConfiguration(
                              configuration,
                              metricQueryServiceRetriever,
                              dispatcherGatewayRetriever,
                              executor);
      
      webMonitorEndpoint =
              restEndpointFactory.createRestEndpoint(
                      configuration,
                      dispatcherGatewayRetriever,
                      resourceManagerGatewayRetriever,
                      blobServer,
                      executor,
                      metricFetcher,
                      highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                      fatalErrorHandler); 

       

       

       

      Attachments

        1. dump-threadPool.png
          606 kB
          Sitan Pang
        2. dump-queueTask.png
          713 kB
          Sitan Pang

        Issue Links

          Activity

            People

              tanyuxin Yuxin Tan
              nagist Sitan Pang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: