diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java index a501b6c..a81d14a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java @@ -37,6 +37,7 @@ private static final long DEFAULT_STATUS_REFRESH_INTERVAL_MS = 1 * 1000l; // 1 seconds wait until subsequent status private static final long DEFAULT_WATCH_MODE_TIMEOUT_MS = 5 * 60 * 1000l; // 5 minutes timeout for watch mode + private static final float DEFAULT_RUNNING_NODES_THRESHOLD = 0.5f; enum OptionConstants { NAME("name", 'n', "LLAP cluster name", true), @@ -47,6 +48,9 @@ WATCH_UNTIL_STATUS_CHANGE("watchUntil", 'w', "Watch until LLAP application status changes to the specified " + "desired state before printing to console. Accepted values are " + Arrays.toString(LlapStatusServiceDriver.State .values()), true), + RUNNING_NODES_THRESHOLD("runningNodesThreshold", 'r', "When watchUntil (-w) option is specified with state as " + + LlapStatusServiceDriver.State.RUNNING_PARTIAL + ", wait until the specified threshold of nodes are up and " + + "running (Default 0.5 which means the status tool waits until 50% of nodes are running before exiting).", true), STATUS_REFRESH_INTERVAL("refreshInterval", 'i', "Amount of time in seconds to wait until subsequent status checks" + " in watch mode. Valid only for watch mode. (Default " + TimeUnit.SECONDS.convert(DEFAULT_STATUS_REFRESH_INTERVAL_MS, TimeUnit.MILLISECONDS) + "s)", true), @@ -104,16 +108,17 @@ public int getNumArgs() { private final long refreshIntervalMs; private final LlapStatusServiceDriver.State watchUntil; private final long watchTimeout; + private final float runningNodesThreshold; - public LlapStatusOptions(String name) { + public LlapStatusOptions(final String name) { this(name, new Properties(), FIND_YARN_APP_TIMEOUT_MS, null, DEFAULT_STATUS_REFRESH_INTERVAL_MS, null, - DEFAULT_WATCH_MODE_TIMEOUT_MS); + DEFAULT_WATCH_MODE_TIMEOUT_MS, DEFAULT_RUNNING_NODES_THRESHOLD); } public LlapStatusOptions(String name, Properties hiveProperties, long findAppTimeoutMs, String outputFile, long refreshIntervalMs, final LlapStatusServiceDriver.State watchUntil, - final long watchTimeoutMs) { + final long watchTimeoutMs, final float runningNodesThreshold) { this.name = name; this.conf = hiveProperties; this.findAppTimeoutMs = findAppTimeoutMs; @@ -121,6 +126,7 @@ public LlapStatusOptions(String name, Properties hiveProperties, long findAppTim this.refreshIntervalMs = refreshIntervalMs; this.watchUntil = watchUntil; this.watchTimeout = watchTimeoutMs; + this.runningNodesThreshold = runningNodesThreshold; } public String getName() { @@ -150,6 +156,10 @@ public long getRefreshIntervalMs() { public long getWatchTimeoutMs() { return watchTimeout; } + + public float getRunningNodesThreshold() { + return runningNodesThreshold; + } } private final Options options = new Options(); @@ -222,7 +232,17 @@ public LlapStatusOptions processOptions(String[] args) throws ParseException { } watchTimeoutMs = TimeUnit.MILLISECONDS.convert(watchTimeoutSec, TimeUnit.SECONDS); } - return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile, refreshIntervalMs, watchUntil, watchTimeoutMs); + + float runningNodesThreshold = DEFAULT_RUNNING_NODES_THRESHOLD; + if (commandLine.hasOption(OptionConstants.RUNNING_NODES_THRESHOLD.getLongOpt())) { + runningNodesThreshold = Float.parseFloat(commandLine.getOptionValue(OptionConstants.RUNNING_NODES_THRESHOLD + .getLongOpt())); + if (runningNodesThreshold < 0.0f || runningNodesThreshold > 1.0f) { + throw new IllegalArgumentException("Running nodes threshold value should be between 0.0 and 1.0 (inclusive)"); + } + } + return new LlapStatusOptions(name, hiveConf, findAppTimeoutMs, outputFile, refreshIntervalMs, watchUntil, + watchTimeoutMs, runningNodesThreshold); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java index 97a131e..26ae237 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.text.DecimalFormat; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -948,7 +949,9 @@ public static void main(String[] args) { final State watchUntilState = options.getWatchUntilState(); final long watchTimeout = options.getWatchTimeoutMs(); long numAttempts = watchTimeout / refreshInterval; + State launchingState = null; State currentState = null; + final float runningNodesThreshold = options.getRunningNodesThreshold(); try (OutputStream os = options.getOutputFile() == null ? System.out : new BufferedOutputStream(new FileOutputStream(options.getOutputFile())); PrintWriter pw = new PrintWriter(os)) { @@ -963,18 +966,53 @@ public static void main(String[] args) { if (ret == ExitCode.SUCCESS.getInt()) { if (watchUntilState != null) { currentState = statusServiceDriver.appStatusBuilder.state; + + // slider has started llap application, now if for some reason state changes to COMPLETE then fail fast + if (launchingState == null && + (currentState.equals(State.LAUNCHING) || currentState.equals(State.RUNNING_PARTIAL))) { + launchingState = currentState; + } + + if (launchingState != null && currentState.equals(State.COMPLETE) && + (watchUntilState.equals(State.RUNNING_PARTIAL) || watchUntilState.equals(State.RUNNING_ALL))) { + LOG.warn("Application stopped while launching. COMPLETE state reached while waiting for {}. Failing " + + "fast..", watchUntilState); + break; + } + if (!currentState.equals(watchUntilState)) { LOG.warn("Current state: {}. Desired state: {}. {}/{} instances.", currentState, watchUntilState, statusServiceDriver.appStatusBuilder.getLiveInstances(), statusServiceDriver.appStatusBuilder.getDesiredInstances()); + // user requested for RUNNING_PARTIAL state but we have already reached RUNNING_ALL state after + // launching (runningNodesThreshold has reached 1.0) + if (launchingState != null && watchUntilState.equals(State.RUNNING_PARTIAL) && + currentState.equals(State.RUNNING_ALL)) { + LOG.warn("Reached {} when waiting for {} after launching.", State.RUNNING_ALL, State.RUNNING_PARTIAL); + break; + } numAttempts--; continue; } + + // when RUNNING_PARTIAL state is reached, check for running nodes threshold condition as well + if (watchUntilState.equals(State.RUNNING_PARTIAL)) { + final int liveInstances = statusServiceDriver.appStatusBuilder.getLiveInstances(); + final int desiredInstances = statusServiceDriver.appStatusBuilder.getDesiredInstances(); + if (liveInstances > 0 && desiredInstances > 0) { + final float ratio = (float) liveInstances / (float) desiredInstances; + if (ratio < runningNodesThreshold) { + LOG.warn("Waiting until running nodes threshold is reached. Current: {} Desired: {}." + + " {}/{} instances.", new DecimalFormat("#.###").format(ratio), + new DecimalFormat("#.###").format(runningNodesThreshold), + statusServiceDriver.appStatusBuilder.getLiveInstances(), + statusServiceDriver.appStatusBuilder.getDesiredInstances()); + numAttempts--; + continue; + } + } + } } - // desired state attained. print and break out of loop - statusServiceDriver.outputJson(pw); - os.flush(); - pw.flush(); } break; } finally { @@ -990,6 +1028,10 @@ public static void main(String[] args) { } } } + // print current state before exiting + statusServiceDriver.outputJson(pw); + os.flush(); + pw.flush(); if (numAttempts == 0 && watchUntilState != null && currentState!= null && !currentState.equals(watchUntilState)) { LOG.info("Watch timeout {}s exhausted before desired state {} is attained.", TimeUnit.SECONDS.convert(watchTimeout, TimeUnit.MILLISECONDS), watchUntilState);