diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java index 69189f40389..470ba8c3f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java @@ -15,9 +15,13 @@ package org.apache.hadoop.yarn.submarine.client.cli; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; +import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +51,24 @@ private static ClientContext getClientContext() { clientContext.setRuntimeFactory(runtimeFactory); return clientContext; } + /** + * JVM Shutdown hook which will do clean up + */ + public static class SubmarineShutdownHook implements Runnable { + private ClientContext context; + + public SubmarineShutdownHook(ClientContext ctx) { + this.context = ctx; + } + + @Override + public void run() { + AppAdminClient appAdminClient = + YarnServiceUtils.getServiceClient(context.getYarnConfig()); + ServiceOperations.stopQuietly(appAdminClient); + } + } public static void main(String[] args) throws Exception { System.out.println(" _ _ \n" + " | | (_) \n" @@ -84,7 +105,9 @@ public static void main(String[] args) throws Exception { String[] moduleArgs = Arrays.copyOfRange(args, 2, args.length); ClientContext clientContext = getClientContext(); - + // Add shutdown hook + ShutdownHookManager.get() + .addShutdownHook(new SubmarineShutdownHook(clientContext), 30); if (args[0].equals("job")) { String subCmd = args[1]; if (subCmd.equals(CliConstants.RUN)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java index 35e21fc8d37..c81393b08be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java @@ -47,11 +47,6 @@ public JobMonitor(ClientContext clientContext) { public abstract JobStatus getTrainingJobStatus(String jobName) throws IOException, YarnException; - /** - * Cleanup AppAdminClient, etc. - */ - public void cleanup() throws IOException {} - /** * Continue wait and print status if job goes to ready or final state. * @param jobName @@ -85,6 +80,5 @@ public void waitTrainingFinal(String jobName) throw new IOException(e); } } - cleanup(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java index ee68ddbd5ec..3947519f809 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java @@ -26,7 +26,6 @@ import java.io.IOException; public class YarnServiceJobMonitor extends JobMonitor { - private volatile AppAdminClient serviceClient = null; public YarnServiceJobMonitor(ClientContext clientContext) { super(clientContext); @@ -35,24 +34,12 @@ public YarnServiceJobMonitor(ClientContext clientContext) { @Override public JobStatus getTrainingJobStatus(String jobName) throws IOException, YarnException { - if (this.serviceClient == null) { - synchronized(this) { - if (this.serviceClient == null) { - this.serviceClient = YarnServiceUtils.createServiceClient( - clientContext.getYarnConfig()); - } - } - } - String appStatus=serviceClient.getStatusString(jobName); - Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus); + + AppAdminClient serviceClient = YarnServiceUtils.getServiceClient( + clientContext.getYarnConfig()); + String appStatus = serviceClient.getStatusString(jobName); + Service serviceSpec = ServiceApiUtil.jsonSerDeser.fromJson(appStatus); JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec); return jobStatus; } - - @Override - public void cleanup() throws IOException{ - if (this.serviceClient != null) { - this.serviceClient.close(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index 2e84c969b02..d04cab9ebbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -602,7 +602,7 @@ public ApplicationId submitJob(RunJobParameters parameters) createServiceByParameters(parameters); String serviceSpecFile = generateServiceSpecFile(serviceSpec); - AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient( + AppAdminClient appAdminClient = YarnServiceUtils.getServiceClient( clientContext.getYarnConfig()); int code = appAdminClient.actionLaunch(serviceSpecFile, serviceSpec.getName(), null, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java index c599fc9591b..1e85c0d6cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java @@ -32,17 +32,25 @@ private static final Logger LOG = LoggerFactory.getLogger(YarnServiceUtils.class); + private static volatile AppAdminClient serviceClient; // This will be true only in UT. private static AppAdminClient stubServiceClient = null; - public static AppAdminClient createServiceClient( + public static AppAdminClient getServiceClient( Configuration yarnConfiguration) { if (stubServiceClient != null) { return stubServiceClient; } - AppAdminClient serviceClient = AppAdminClient.createAppAdminClient( - DEFAULT_TYPE, yarnConfiguration); + if (serviceClient == null) { + synchronized (YarnServiceUtils.class) { + if (serviceClient == null) { + serviceClient = AppAdminClient.createAppAdminClient( + DEFAULT_TYPE, yarnConfiguration); + } + } + } + return serviceClient; }