diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java index 306391b..a501b6c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cli; +import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -27,17 +28,15 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class LlapStatusOptionsProcessor { - private static final Logger LOG = LoggerFactory.getLogger(LlapStatusOptionsProcessor.class); - private static final String LLAPSTATUS_CONSTANT = "llapstatus"; private static final long FIND_YARN_APP_TIMEOUT_MS = 20 * 1000l; // 20seconds to wait for app to be visible + private static final long DEFAULT_STATUS_REFRESH_INTERVAL_MS = 1 * 1000l; // 1 seconds wait until subsequent status + private static final long DEFAULT_WATCH_MODE_TIMEOUT_MS = 5 * 60 * 1000l; // 5 minutes timeout for watch mode enum OptionConstants { NAME("name", 'n', "LLAP cluster name", true), @@ -45,6 +44,14 @@ "Amount of time(s) that the tool will sleep to wait for the YARN application to start. negative values=wait forever, 0=Do not wait. default=" + TimeUnit.SECONDS.convert(FIND_YARN_APP_TIMEOUT_MS, TimeUnit.MILLISECONDS) + "s", true), OUTPUT_FILE("outputFile", 'o', "File to which output should be written (Default stdout)", true), + WATCH_UNTIL_STATUS_CHANGE("watchUntil", 'w', "Watch until LLAP application status changes to the specified " + + "desired state before printing to console. Accepted values are " + Arrays.toString(LlapStatusServiceDriver.State + .values()), true), + STATUS_REFRESH_INTERVAL("refreshInterval", 'i', "Amount of time in seconds to wait until subsequent status checks" + + " in watch mode. Valid only for watch mode. (Default " + + TimeUnit.SECONDS.convert(DEFAULT_STATUS_REFRESH_INTERVAL_MS, TimeUnit.MILLISECONDS) + "s)", true), + WATCH_MODE_TIMEOUT("watchTimeout", 't', "Exit watch mode if the desired state is not attained until the specified" + + " timeout. (Default " + TimeUnit.SECONDS.convert(DEFAULT_WATCH_MODE_TIMEOUT_MS, TimeUnit.MILLISECONDS) +"s)", true), HIVECONF("hiveconf", null, "Use value for given property. Overridden by explicit parameters", "property=value", 2), HELP("help", 'H', "Print help information", false); @@ -94,17 +101,26 @@ public int getNumArgs() { private final Properties conf; private final long findAppTimeoutMs; private final String outputFile; + private final long refreshIntervalMs; + private final LlapStatusServiceDriver.State watchUntil; + private final long watchTimeout; + + public LlapStatusOptions(String name) { + this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null, DEFAULT_STATUS_REFRESH_INTERVAL_MS, null, + DEFAULT_WATCH_MODE_TIMEOUT_MS); + } public LlapStatusOptions(String name, Properties hiveProperties, long findAppTimeoutMs, - String outputFile) { + String outputFile, long refreshIntervalMs, + final LlapStatusServiceDriver.State watchUntil, + final long watchTimeoutMs) { this.name = name; this.conf = hiveProperties; this.findAppTimeoutMs = findAppTimeoutMs; this.outputFile = outputFile; - } - - public LlapStatusOptions(String name) { - this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null); + this.refreshIntervalMs = refreshIntervalMs; + this.watchUntil = watchUntil; + this.watchTimeout = watchTimeoutMs; } public String getName() { @@ -122,6 +138,18 @@ public long getFindAppTimeoutMs() { public String getOutputFile() { return outputFile; } + + public long getRefreshIntervalMs() { + return refreshIntervalMs; + } + + public LlapStatusServiceDriver.State getWatchUntilState() { + return watchUntil; + } + + public long getWatchTimeoutMs() { + return watchTimeout; + } } private final Options options = new Options(); @@ -170,7 +198,31 @@ public LlapStatusOptions processOptions(String[] args) throws ParseException { outputFile = commandLine.getOptionValue(OptionConstants.OUTPUT_FILE.getLongOpt()); } - return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile); + long refreshIntervalMs = DEFAULT_STATUS_REFRESH_INTERVAL_MS; + if (commandLine.hasOption(OptionConstants.STATUS_REFRESH_INTERVAL.getLongOpt())) { + long refreshIntervalSec = Long.parseLong(commandLine.getOptionValue(OptionConstants.STATUS_REFRESH_INTERVAL + .getLongOpt())); + if (refreshIntervalSec <= 0) { + throw new IllegalArgumentException("Refresh interval should be >0"); + } + refreshIntervalMs = TimeUnit.MILLISECONDS.convert(refreshIntervalSec, TimeUnit.SECONDS); + } + + LlapStatusServiceDriver.State watchUntil = null; + if (commandLine.hasOption(OptionConstants.WATCH_UNTIL_STATUS_CHANGE.getLongOpt())) { + String watchUntilStr = commandLine.getOptionValue(OptionConstants.WATCH_UNTIL_STATUS_CHANGE.getLongOpt()); + watchUntil = LlapStatusServiceDriver.State.valueOf(watchUntilStr); + } + + long watchTimeoutMs = DEFAULT_WATCH_MODE_TIMEOUT_MS; + if (commandLine.hasOption(OptionConstants.WATCH_MODE_TIMEOUT.getLongOpt())) { + long watchTimeoutSec = Long.parseLong(commandLine.getOptionValue(OptionConstants.WATCH_MODE_TIMEOUT.getLongOpt())); + if (watchTimeoutSec <= 0) { + throw new IllegalArgumentException("Watch timeout should be >0"); + } + watchTimeoutMs = TimeUnit.MILLISECONDS.convert(watchTimeoutSec, TimeUnit.SECONDS); + } + return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile, refreshIntervalMs, watchUntil, watchTimeoutMs); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java index 0efe545..97a131e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -24,12 +24,14 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -99,20 +101,57 @@ private static final String CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS = CONF_PREFIX + "zk-registry.timeout-ms"; - private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 10000l; + private static final long CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT = 20000l; - - private static final String AM_KEY = "slider-appmaster"; private static final String LLAP_KEY = "LLAP"; - private final Configuration conf; private final Clock clock = new SystemClock(); + private String appName = null; + private SliderClient sliderClient = null; + private Configuration llapRegistryConf = null; + private LlapRegistryService llapRegistry = null; + @VisibleForTesting - final AppStatusBuilder appStatusBuilder = new AppStatusBuilder(); + AppStatusBuilder appStatusBuilder; public LlapStatusServiceDriver() { SessionState ss = SessionState.get(); conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class); + setupConf(); + } + + private void setupConf() { + for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) { + conf.addResource(f); + } + conf.reloadConfiguration(); + + // Setup timeouts for various services. + + // Once we move to a Hadoop-2.8 dependency, the following paramteer can be used. + // conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC); + conf.set("yarn.timeline-service.entity-group-fs-store.retry-policy-spec", + conf.get(CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC, + CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC_DEFAULT)); + + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + conf.getLong(CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS, + CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS_DEFAULT)); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + conf.getLong(CONFIG_YARN_RM_RETRY_INTERVAL_MS, CONFIG_YARN_RM_RETRY_INTERVAL_MS_DEFAULT)); + + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + conf.getInt(CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES, + CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT)); + conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + conf.getLong(CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS, + CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS_DEFAULT)); + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, (conf + .getLong(CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS, CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT) + + "ms")); + + llapRegistryConf = new Configuration(conf); } /** @@ -135,66 +174,37 @@ public LlapStatusOptions parseOptions(String[] args) throws LlapStatusCliExcepti } public int run(LlapStatusOptions options) { - - SliderClient sliderClient = null; + appStatusBuilder = new AppStatusBuilder(); try { + if (appName == null) { + // user provided configs + for (Map.Entry props : options.getConf().entrySet()) { + conf.set((String) props.getKey(), (String) props.getValue()); + } - for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) { - conf.addResource(f); - } - conf.reloadConfiguration(); - for (Map.Entry props : options.getConf().entrySet()) { - conf.set((String) props.getKey(), (String) props.getValue()); - } - - // Setup timeouts for various services. - - // Once we move to a Hadoop-2.8 dependency, the following paramteer can be used. - // conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC); - conf.set("yarn.timeline-service.entity-group-fs-store.retry-policy-spec", - conf.get(CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC, - CONFIG_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC_DEFAULT)); - - conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, - conf.getLong(CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS, - CONFIG_YARN_RM_TIMEOUT_MAX_WAIT_MS_DEFAULT)); - conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, - conf.getLong(CONFIG_YARN_RM_RETRY_INTERVAL_MS, CONFIG_YARN_RM_RETRY_INTERVAL_MS_DEFAULT)); - - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - conf.getInt(CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES, - CONFIG_IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT)); - conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, - conf.getLong(CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS, - CONFIG_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS_DEFAULT)); - - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, (conf - .getLong(CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS, CONFIG_LLAP_ZK_REGISTRY_TIMEOUT_MS_DEFAULT) + - "ms")); - - - - String appName; - appName = options.getName(); - if (StringUtils.isEmpty(appName)) { - appName = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - if (appName.startsWith("@") && appName.length() > 1) { - // This is a valid slider app name. Parse it out. - appName = appName.substring(1); - } else { - // Invalid app name. Checked later. - appName = null; + appName = options.getName(); + if (StringUtils.isEmpty(appName)) { + appName = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + if (appName.startsWith("@") && appName.length() > 1) { + // This is a valid slider app name. Parse it out. + appName = appName.substring(1); + } else { + // Invalid app name. Checked later. + appName = null; + } } - } - if (StringUtils.isEmpty(appName)) { - String message = + if (StringUtils.isEmpty(appName)) { + String message = "Invalid app name. This must be setup via config or passed in as a parameter." + - " This tool works with clusters deployed by Slider/YARN"; - LOG.info(message); - return ExitCode.INCORRECT_USAGE.getInt(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Using appName: {}", appName); + " This tool works with clusters deployed by Slider/YARN"; + LOG.info(message); + return ExitCode.INCORRECT_USAGE.getInt(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Using appName: {}", appName); + } + + llapRegistryConf.set(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + appName); } try { @@ -205,7 +215,7 @@ public int run(LlapStatusOptions options) { } // Get the App report from YARN - ApplicationReport appReport = null; + ApplicationReport appReport; try { appReport = getAppReport(appName, sliderClient, options.getFindAppTimeoutMs()); } catch (LlapStatusCliException e) { @@ -225,7 +235,7 @@ public int run(LlapStatusOptions options) { if (ret != ExitCode.SUCCESS) { return ret.getInt(); } else if (EnumSet.of(State.APP_NOT_FOUND, State.COMPLETE, State.LAUNCHING) - .contains(appStatusBuilder.getState())) { + .contains(appStatusBuilder.getState())) { return ExitCode.SUCCESS.getInt(); } else { // Get information from slider. @@ -238,24 +248,22 @@ public int run(LlapStatusOptions options) { } } - if (ret !=ExitCode.SUCCESS ) { + if (ret != ExitCode.SUCCESS) { return ret.getInt(); } else { try { - ret = populateAppStatusFromLlapRegistry(appName, appStatusBuilder); + ret = populateAppStatusFromLlapRegistry(appStatusBuilder); } catch (LlapStatusCliException e) { logError(e); return e.getExitCode().getInt(); } } + return ret.getInt(); - }finally { + } finally { if (LOG.isDebugEnabled()) { LOG.debug("Final AppState: " + appStatusBuilder.toString()); } - if (sliderClient != null) { - sliderClient.stop(); - } } } @@ -274,7 +282,10 @@ public void outputJson(PrintWriter writer) throws LlapStatusCliException { } private SliderClient createSliderClient() throws LlapStatusCliException { - SliderClient sliderClient; + if (sliderClient != null) { + return sliderClient; + } + try { sliderClient = new SliderClient() { @Override @@ -285,17 +296,16 @@ public void serviceInit(Configuration conf) throws Exception { }; Configuration sliderClientConf = new Configuration(conf); sliderClientConf = sliderClient.bindArgs(sliderClientConf, - new String[] { "help" }); + new String[]{"help"}); sliderClient.init(sliderClientConf); sliderClient.start(); return sliderClient; } catch (Exception e) { throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_CREATE_FAILED, - "Failed to create slider client", e); + "Failed to create slider client", e); } } - private ApplicationReport getAppReport(String appName, SliderClient sliderClient, long timeoutMs) throws LlapStatusCliException { @@ -466,94 +476,87 @@ private ExitCode populateAppStatusFromSlider(String appName, SliderClient slider /** - * - * @param appName * @param appStatusBuilder * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible * @throws LlapStatusCliException */ - private ExitCode populateAppStatusFromLlapRegistry(String appName, AppStatusBuilder appStatusBuilder) throws - LlapStatusCliException { - Configuration llapRegistryConf= new Configuration(conf); - llapRegistryConf - .set(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + appName); - LlapRegistryService llapRegistry; - try { - llapRegistry = LlapRegistryService.getClient(llapRegistryConf); - } catch (Exception e) { - throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, - "Failed to create llap registry client", e); - } - try { - Collection serviceInstances; + private ExitCode populateAppStatusFromLlapRegistry(AppStatusBuilder appStatusBuilder) throws + LlapStatusCliException { + + if (llapRegistry == null) { try { - serviceInstances = llapRegistry.getInstances().getAll(); - } catch (IOException e) { - throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e); + llapRegistry = LlapRegistryService.getClient(llapRegistryConf); + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, + "Failed to create llap registry client", e); } + } - if (serviceInstances == null || serviceInstances.isEmpty()) { - LOG.info("No information found in the LLAP registry"); - appStatusBuilder.setLiveInstances(0); - appStatusBuilder.setState(State.LAUNCHING); - appStatusBuilder.clearLlapInstances(); - return ExitCode.SUCCESS; - } else { - // Tracks instances known by both slider and llap. - List validatedInstances = new LinkedList<>(); - List llapExtraInstances = new LinkedList<>(); - - for (ServiceInstance serviceInstance : serviceInstances) { - String containerIdString = serviceInstance.getProperties().get( - HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); - - LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer( - containerIdString); - if (llapInstance != null) { - llapInstance.setMgmtPort(serviceInstance.getManagementPort()); - llapInstance.setRpcPort(serviceInstance.getRpcPort()); - llapInstance.setShufflePort(serviceInstance.getShufflePort()); - llapInstance.setWebUrl(serviceInstance.getServicesAddress()); - llapInstance.setStatusUrl(serviceInstance.getServicesAddress() + "/status"); - validatedInstances.add(llapInstance); - } else { - // This likely indicates that an instance has recently restarted - // (the old instance has not been unregistered), and the new instances has not registered yet. - llapExtraInstances.add(containerIdString); - // This instance will not be added back, since it's services are not up yet. - } - - } + Collection serviceInstances; + try { + serviceInstances = llapRegistry.getInstances().getAll(); + } catch (IOException e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e); + } - appStatusBuilder.setLiveInstances(validatedInstances.size()); - if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) { - appStatusBuilder.setState(State.RUNNING_ALL); - if (validatedInstances.size() > appStatusBuilder.getDesiredInstances()) { - LOG.warn("Found more entries in LLAP registry, as compared to desired entries"); - } + if (serviceInstances == null || serviceInstances.isEmpty()) { + LOG.info("No information found in the LLAP registry"); + appStatusBuilder.setLiveInstances(0); + appStatusBuilder.setState(State.LAUNCHING); + appStatusBuilder.clearLlapInstances(); + return ExitCode.SUCCESS; + } else { + // Tracks instances known by both slider and llap. + List validatedInstances = new LinkedList<>(); + List llapExtraInstances = new LinkedList<>(); + + for (ServiceInstance serviceInstance : serviceInstances) { + String containerIdString = serviceInstance.getProperties().get( + HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + + LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer( + containerIdString); + if (llapInstance != null) { + llapInstance.setMgmtPort(serviceInstance.getManagementPort()); + llapInstance.setRpcPort(serviceInstance.getRpcPort()); + llapInstance.setShufflePort(serviceInstance.getShufflePort()); + llapInstance.setWebUrl(serviceInstance.getServicesAddress()); + llapInstance.setStatusUrl(serviceInstance.getServicesAddress() + "/status"); + validatedInstances.add(llapInstance); } else { - appStatusBuilder.setState(State.RUNNING_PARTIAL); + // This likely indicates that an instance has recently restarted + // (the old instance has not been unregistered), and the new instances has not registered yet. + llapExtraInstances.add(containerIdString); + // This instance will not be added back, since it's services are not up yet. } - // At this point, everything that can be consumed from AppStatusBuilder has been consumed. - // Debug only - if (appStatusBuilder.allInstances().size() > 0) { - // Containers likely to come up soon. - LOG.debug("Potential instances starting up: {}", appStatusBuilder.allInstances()); - } - if (llapExtraInstances.size() > 0) { - // Old containers which are likely shutting down - LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances); - } + } - appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances); + appStatusBuilder.setLiveInstances(validatedInstances.size()); + if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) { + appStatusBuilder.setState(State.RUNNING_ALL); + if (validatedInstances.size() > appStatusBuilder.getDesiredInstances()) { + LOG.warn("Found more entries in LLAP registry, as compared to desired entries"); + } + } else { + appStatusBuilder.setState(State.RUNNING_PARTIAL); + } + // At this point, everything that can be consumed from AppStatusBuilder has been consumed. + // Debug only + if (appStatusBuilder.allInstances().size() > 0) { + // Containers likely to come up soon. + LOG.debug("Potential instances starting up: {}", appStatusBuilder.allInstances()); } - return ExitCode.SUCCESS; - } finally { - llapRegistry.stop(); - } + if (llapExtraInstances.size() > 0) { + // Old containers which are likely shutting down + LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances); + } + + appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances); + } + return ExitCode.SUCCESS; } @@ -564,8 +567,8 @@ private ExitCode populateAppStatusFromLlapRegistry(String appName, AppStatusBuil private String originalConfigurationPath; private String generatedConfigurationPath; - private Integer desiredInstances; - private Integer liveInstances; + private int desiredInstances = -1; + private int liveInstances = -1; private Long appStartTime; private Long appFinishTime; @@ -658,11 +661,11 @@ public String getGeneratedConfigurationPath() { return generatedConfigurationPath; } - public Integer getDesiredInstances() { + public int getDesiredInstances() { return desiredInstances; } - public Integer getLiveInstances() { + public int getLiveInstances() { return liveInstances; } @@ -916,7 +919,7 @@ private static void logError(Throwable t) { public static void main(String[] args) { - LOG.info("LLAP status invoked with arguments = {}", args); + LOG.info("LLAP status invoked with arguments = {}", Arrays.toString(args)); int ret = ExitCode.SUCCESS.getInt(); LlapStatusServiceDriver statusServiceDriver = null; @@ -925,6 +928,7 @@ public static void main(String[] args) { statusServiceDriver = new LlapStatusServiceDriver(); options = statusServiceDriver.parseOptions(args); } catch (Throwable t) { + statusServiceDriver.close(); logError(t); if (t instanceof LlapStatusCliException) { LlapStatusCliException ce = (LlapStatusCliException) t; @@ -934,20 +938,62 @@ public static void main(String[] args) { } } if (ret != 0 || options == null) { // Failure / help + if (statusServiceDriver != null) { + statusServiceDriver.close(); + } System.exit(ret); } - try { - ret = statusServiceDriver.run(options); - if (ret == ExitCode.SUCCESS.getInt()) { - try (OutputStream os = options.getOutputFile() == null ? System.out : - new BufferedOutputStream( - new FileOutputStream(options.getOutputFile())); PrintWriter pw = new PrintWriter( - os)) { - statusServiceDriver.outputJson(pw); + final long refreshInterval = options.getRefreshIntervalMs(); + final State watchUntilState = options.getWatchUntilState(); + final long watchTimeout = options.getWatchTimeoutMs(); + long numAttempts = watchTimeout / refreshInterval; + State currentState = null; + try (OutputStream os = options.getOutputFile() == null ? System.out : + new BufferedOutputStream(new FileOutputStream(options.getOutputFile())); + PrintWriter pw = new PrintWriter(os)) { + + LOG.info("Configured refresh interval: {}s. Watch timeout: {}s. Attempts remaining: {}", + TimeUnit.SECONDS.convert(refreshInterval, TimeUnit.MILLISECONDS), + TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS), + numAttempts); + while (numAttempts > 0) { + try { + ret = statusServiceDriver.run(options); + if (ret == ExitCode.SUCCESS.getInt()) { + if (watchUntilState != null) { + currentState = statusServiceDriver.appStatusBuilder.state; + if (!currentState.equals(watchUntilState)) { + LOG.warn("Current state: {}. Desired state: {}. {}/{} instances.", currentState, watchUntilState, + statusServiceDriver.appStatusBuilder.getLiveInstances(), + statusServiceDriver.appStatusBuilder.getDesiredInstances()); + numAttempts--; + continue; + } + } + // desired state attained. print and break out of loop + statusServiceDriver.outputJson(pw); + os.flush(); + pw.flush(); + } + break; + } finally { + if (watchUntilState != null) { + try { + Thread.sleep(refreshInterval); + } catch (InterruptedException e) { + // ignore + } + } else { + // reported once, so break + break; + } } } - + if (numAttempts == 0 && watchUntilState != null && currentState!= null && !currentState.equals(watchUntilState)) { + LOG.info("Watch timeout {}s exhausted before desired state {} is attained.", + TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS), watchUntilState); + } } catch (Throwable t) { logError(t); if (t instanceof LlapStatusCliException) { @@ -958,10 +1004,20 @@ public static void main(String[] args) { } } finally { LOG.info("LLAP status finished"); + statusServiceDriver.close(); } if (LOG.isDebugEnabled()) { LOG.debug("Completed processing - exiting with " + ret); } System.exit(ret); } + + private void close() { + if (sliderClient != null) { + sliderClient.stop(); + } + if (llapRegistry != null) { + llapRegistry.stop(); + } + } }