Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-5510

There are some logic problems, which may lead to thread leaks

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.11.0, 0.10.1
    • Component/s: Interpreters
    • Labels:
      None

      Description

      public void addJob(InterpreterContext context, JobClient jobClient) {
        String paragraphId = context.getParagraphId();
        JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
        long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
        FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
        thread.setName("JobProgressPoller-Thread-" + paragraphId);
        thread.start();
        this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
        if (previousJobClient != null) {
          LOGGER.warn("There's another Job {} that is associated with paragraph {}",
                  jobClient.getJobID(), paragraphId);
        }
      }
      

      There are some problems with this code.It may cause thread leak.I think it shoud be changed  to this

      public void addJob(InterpreterContext context, JobClient jobClient) {
        String paragraphId = context.getParagraphId();
        JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
        if (previousJobClient != null) {
          LOGGER.warn("There's another Job {} that is associated with paragraph {}",
                  jobClient.getJobID(), paragraphId);
          return;
        }
        long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
        FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
        thread.setName("JobProgressPoller-Thread-" + paragraphId);
        thread.start();
        this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
      }
      

      If previousJobClient is not null.We shouldn't start  threading again.

       

        Attachments

          Activity

            People

            • Assignee:
              GYN gongyining
              Reporter:
              GYN gongyining
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h
                3h