Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.10.0
-
None
Description
public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000")); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); if (previousJobClient != null) { LOGGER.warn("There's another Job {} that is associated with paragraph {}", jobClient.getJobID(), paragraphId); } }
There are some problems with this code.It may cause thread leak.I think it shoud be changed to this
public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); if (previousJobClient != null) { LOGGER.warn("There's another Job {} that is associated with paragraph {}", jobClient.getJobID(), paragraphId); return; } long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000")); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); }
If previousJobClient is not null.We shouldn't start threading again.