diff --git bin/ext/llapstatus.sh bin/ext/llapstatus.sh new file mode 100644 index 0000000..96edda2 --- /dev/null +++ bin/ext/llapstatus.sh @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=llapstatus +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +llapstatus () { + CLASS=org.apache.hadoop.hive.llap.cli.LlapStatusServiceDriver; + if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then + echo "Missing Hive CLI Jar" + exit 3; + fi + + if $cygwin; then + HIVE_LIB=`cygpath -w "$HIVE_LIB"` + fi + + set -e; + + export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=llap-cli-log4j2.properties " + # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf + $HADOOP $CLASS $HIVE_OPTS "$@" + +} + +llapstatus_help () { + CLASS=org.apache.hadoop.hive.llap.cli.LlapStatusServiceDriver; + execHiveCmd $CLASS "--help" +} + diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95c5c0e..fabb8ab 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -330,6 +330,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_SSL.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); } /** @@ -2640,6 +2641,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_DAEMON_QUEUE_NAME("hive.llap.daemon.queue.name", null, "Queue name within which the llap slider application will run." + " Used in LlapServiceDriver and package.py"), + LLAP_DAEMON_CONTAINER_ID("hive.llap.daemon.container.id", null, + "ContainerId of a running LlapDaemon. Used to publish to the registry"), LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED("hive.llap.daemon.shuffle.dir.watcher.enabled", false, "TODO doc", "llap.daemon.shuffle.dir-watcher.enabled"), LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS( diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java index 2bd860a..7e37e96 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java @@ -53,6 +53,12 @@ */ public int getShufflePort(); + + /** + * Address for services hosted on http + * @return + */ + public String getServicesAddress(); /** * Return the last known state (without refreshing) * diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 8cace8f..e2354cb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -16,6 +16,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; import java.net.UnknownHostException; import java.util.Collections; import java.util.Comparator; @@ -52,6 +54,8 @@ private final int port; private final int shuffle; private final int mngPort; + private final int webPort; + private final String webScheme; private final String[] hosts; private final int memory; private final int vcores; @@ -66,6 +70,11 @@ public LlapFixedRegistryImpl(String hosts, Configuration conf) { this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); + + this.webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + boolean isSsl = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); + this.webScheme = isSsl ? "https" : "http"; + for (Map.Entry kv : conf) { if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP) || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) { @@ -106,6 +115,7 @@ public static String getWorkerIdentity(String host) { private final class FixedServiceInstance implements ServiceInstance { private final String host; + private final String serviceAddress; public FixedServiceInstance(String host) { if (resolveHosts) { @@ -123,6 +133,15 @@ public FixedServiceInstance(String host) { } } this.host = host; + final URL serviceURL; + try { + serviceURL = + new URL(LlapFixedRegistryImpl.this.webScheme, host, LlapFixedRegistryImpl.this.webPort, + ""); + this.serviceAddress = serviceURL.toString(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } } public String getWorkerIdentity() { @@ -151,6 +170,11 @@ public int getShufflePort() { } @Override + public String getServicesAddress() { + return serviceAddress; + } + + @Override public boolean isAlive() { return true; } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index ba38fb8..8bfc097 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -309,13 +309,19 @@ public void unregister() throws IOException { private final int rpcPort; private final int mngPort; private final int shufflePort; + private final String serviceAddress; public DynamicServiceInstance(ServiceRecord srv) throws IOException { this.srv = srv; + if (LOG.isDebugEnabled()) { + LOG.debug("Working with ServiceRecord: {}", srv); + } + final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE); final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP); final Endpoint mng = srv.getInternalEndpoint(IPC_MNG); + final Endpoint services = srv.getExternalEndpoint(IPC_SERVICES); this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0), @@ -329,6 +335,8 @@ public DynamicServiceInstance(ServiceRecord srv) throws IOException { this.shufflePort = Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), AddressTypes.ADDRESS_PORT_FIELD)); + this.serviceAddress = + RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI); } @Override @@ -352,6 +360,11 @@ public int getShufflePort() { } @Override + public String getServicesAddress() { + return serviceAddress; + } + + @Override public boolean isAlive() { return alive; } @@ -377,7 +390,8 @@ public Resource getResource() { @Override public String toString() { return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + - " with resources=" + getResource() + "]"; + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]"; } @Override diff --git llap-server/pom.xml llap-server/pom.xml index c81bdb2..9de3443 100644 --- llap-server/pom.xml +++ llap-server/pom.xml @@ -159,6 +159,61 @@ + + org.apache.slider + slider-core + ${slider.version} + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + commons-digester + commons-digester + + + com.codahale.metrics + metrics-core + + + com.codahale.metrics + metrics-servlets + + + javax.xml.bind + jaxb-api + + + com.google.inject + guice + + + com.sun.jersey.contribs + jersey-guice + + + org.mortbay.jetty + jetty-sslengine + + + org.codehaus.jettison + jettison + + + asm + asm + + + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java index cdc919e..16597c7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java @@ -61,7 +61,7 @@ public static final String OPTION_SLIDER_DEFAULT_KEYTAB = "slider-default-keytab"; - public class LlapOptions { + public static class LlapOptions { private final int instances; private final String directory; private final String name; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java new file mode 100644 index 0000000..97cdc11 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusOptionsProcessor.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli; + +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapStatusOptionsProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(LlapStatusOptionsProcessor.class); + + private static final String LLAPSTATUS_CONSTANT = "llapstatus"; + + enum OptionConstants { + + NAME("name", 'n', "LLAP cluster name"), + HIVECONF("hiveconf", null, "Use value for given property. Overridden by explicit parameters", "property=value", 2), + HELP("help", 'H', "Print help information"),; + + + private final String longOpt; + private final Character shortOpt; + private final String description; + private final String argName; + private final int numArgs; + + OptionConstants(String longOpt, char shortOpt, String description) { + this(longOpt, shortOpt, description, longOpt, 1); + + } + + OptionConstants(String longOpt, Character shortOpt, String description, String argName, int numArgs) { + this.longOpt = longOpt; + this.shortOpt = shortOpt; + this.description = description; + this.argName = argName; + this.numArgs = numArgs; + } + + public String getLongOpt() { + return longOpt; + } + + public Character getShortOpt() { + return shortOpt; + } + + public String getDescription() { + return description; + } + + public String getArgName() { + return argName; + } + + public int getNumArgs() { + return numArgs; + } + } + + + public static class LlapStatusOptions { + private final String name; + + LlapStatusOptions(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + private final Options options = new Options(); + private org.apache.commons.cli.CommandLine commandLine; + + public LlapStatusOptionsProcessor() { + + for (OptionConstants optionConstant : OptionConstants.values()) { + + OptionBuilder optionBuilder = OptionBuilder.hasArgs(optionConstant.getNumArgs()) + .withArgName(optionConstant.getArgName()).withLongOpt(optionConstant.getLongOpt()) + .withDescription(optionConstant.getDescription()); + if (optionConstant.getShortOpt() == null) { + options.addOption(optionBuilder.create()); + } else { + options.addOption(optionBuilder.create(optionConstant.getShortOpt())); + } + } + } + + public LlapStatusOptions processOptions(String[] args) throws ParseException { + commandLine = new GnuParser().parse(options, args); + if (commandLine.hasOption(OptionConstants.HELP.getShortOpt()) || + false == commandLine.hasOption(OptionConstants.NAME.getLongOpt())) { + printUsage(); + return null; + } + + String name = commandLine.getOptionValue(OptionConstants.NAME.getLongOpt()); + return new LlapStatusOptions(name); + } + + + private void printUsage() { + new HelpFormatter().printHelp(LLAPSTATUS_CONSTANT, options); + } + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java new file mode 100644 index 0000000..b21b44c --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -0,0 +1,822 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli; + + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.cli.LlapStatusOptionsProcessor.LlapStatusOptions; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.ClusterDescriptionKeys; +import org.apache.slider.api.StatusKeys; +import org.apache.slider.client.SliderClient; +import org.apache.slider.core.exceptions.SliderException; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapStatusServiceDriver { + + private static final Logger LOG = LoggerFactory.getLogger(LlapStatusServiceDriver.class); + + private static final long FIND_YARN_APP_TIMEOUT = 20 * 1000l; // 20seconds to wait for app to be visible + + private static final String AM_KEY = "slider-appmaster"; + private static final String LLAP_KEY = "LLAP"; + + private final Configuration conf; + private final Clock clock = new SystemClock(); + private final AppStatusBuilder appStatusBuilder = new AppStatusBuilder(); + + public LlapStatusServiceDriver() { + SessionState ss = SessionState.get(); + conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class); + } + + + private int run(String[] args) { + + SliderClient sliderClient = null; + try { + LlapStatusOptionsProcessor optionsProcessor = new LlapStatusOptionsProcessor(); + LlapStatusOptions options; + try { + options = optionsProcessor.processOptions(args); + } catch (Exception e) { + LOG.info("Failed to parse arguments", e); + return ExitCode.INCORRECT_USAGE.getInt(); + } + + for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) { + conf.addResource(f); + } + conf.reloadConfiguration(); + + + try { + sliderClient = createSliderClient(); + } catch (LlapStatusCliException e) { + logError(e); + return e.getExitCode().getInt(); + } + + // Get the App report from YARN + ApplicationReport appReport = null; + try { + appReport = getAppReport(options, sliderClient, FIND_YARN_APP_TIMEOUT); + } catch (LlapStatusCliException e) { + logError(e); + return e.getExitCode().getInt(); + } + + // Process the report to decide whether to go to slider. + ExitCode ret; + try { + ret = processAppReport(appReport, appStatusBuilder); + } catch (LlapStatusCliException e) { + logError(e); + return e.getExitCode().getInt(); + } + + if (ret != ExitCode.SUCCESS) { + return ret.getInt(); + } else if (EnumSet.of(State.APP_NOT_FOUND, State.COMPLETE, State.LAUNCHING) + .contains(appStatusBuilder.getState())) { + return ExitCode.SUCCESS.getInt(); + } else { + // Get information from slider. + try { + ret = populateAppStatusFromSlider(options, sliderClient, appStatusBuilder); + } catch (LlapStatusCliException e) { + // In case of failure, send back whatever is constructed sop far - which wouldbe from the AppReport + logError(e); + return e.getExitCode().getInt(); + } + } + + if (ret !=ExitCode.SUCCESS ) { + return ret.getInt(); + } else { + try { + ret = populateAppStatusFromLlapRegistry(options, appStatusBuilder); + } catch (LlapStatusCliException e) { + logError(e); + return e.getExitCode().getInt(); + } + } + return ret.getInt(); + }finally { + if (LOG.isTraceEnabled()) { + LOG.trace("Final AppState: " + appStatusBuilder.toString()); + } + if (sliderClient != null) { + sliderClient.stop(); + } + } + } + + private void outputJson() throws LlapStatusCliException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_EMPTY); + try { + System.out + .println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(appStatusBuilder)); + } catch (IOException e) { + throw new LlapStatusCliException(ExitCode.LLAP_JSON_GENERATION_ERROR, "Failed to create JSON", + e); + } + } + + private SliderClient createSliderClient() throws LlapStatusCliException { + SliderClient sliderClient; + try { + sliderClient = new SliderClient() { + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + initHadoopBinding(); + } + }; + Configuration sliderClientConf = new Configuration(conf); + sliderClientConf = sliderClient.bindArgs(sliderClientConf, + new String[] { "help" }); + sliderClient.init(sliderClientConf); + sliderClient.start(); + return sliderClient; + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_CREATE_FAILED, + "Failed to create slider client", e); + } + } + + + private ApplicationReport getAppReport(LlapStatusOptions options, SliderClient sliderClient, + long timeoutMs) throws LlapStatusCliException { + + long startTime = clock.getTime(); + long timeoutTime = startTime + timeoutMs; + ApplicationReport appReport = null; + + // TODO Maybe add an option to wait for a certain amount of time for the app to + // move to running state. Potentially even wait for the containers to be launched. + while (clock.getTime() < timeoutTime && appReport == null) { + try { + appReport = sliderClient.getYarnAppListClient().findInstance(options.getName()); + if (appReport == null) { + long remainingTime = Math.min(timeoutTime - clock.getTime(), 500l); + if (remainingTime > 0) { + Thread.sleep(remainingTime); + } else { + break; + } + } + } catch (Exception e) { // No point separating IOException vs YarnException vs others + throw new LlapStatusCliException(ExitCode.YARN_ERROR, + "Failed to get Yarn AppReport", e); + } + } + return appReport; + } + + + /** + * Populates parts of the AppStatus + * + * @param appReport + * @param appStatusBuilder + * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible + * @throws LlapStatusCliException + */ + private ExitCode processAppReport(ApplicationReport appReport, + AppStatusBuilder appStatusBuilder) throws LlapStatusCliException { + if (appReport == null) { + appStatusBuilder.setState(State.APP_NOT_FOUND); + LOG.info("No Application Found"); + return ExitCode.SUCCESS; + } + + appStatusBuilder.setAmInfo( + new AmInfo().setAppName(appReport.getName()).setAppType(appReport.getApplicationType())); + appStatusBuilder.setAppStartTime(appReport.getStartTime()); + switch (appReport.getYarnApplicationState()) { + case NEW: + case NEW_SAVING: + case SUBMITTED: + appStatusBuilder.setState(State.LAUNCHING); + return ExitCode.SUCCESS; + case ACCEPTED: + appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString()); + appStatusBuilder.setState(State.LAUNCHING); + return ExitCode.SUCCESS; + case RUNNING: + appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString()); + // If the app state is running, get additional information from Slider itself. + return ExitCode.SUCCESS; + case FINISHED: + case FAILED: + case KILLED: + appStatusBuilder.maybeCreateAndGetAmInfo().setAppId(appReport.getApplicationId().toString()); + appStatusBuilder.setAppFinishTime(appReport.getFinishTime()); + appStatusBuilder.setState(State.COMPLETE); + return ExitCode.SUCCESS; + default: + throw new LlapStatusCliException(ExitCode.INTERNAL_ERROR, + "Unknown Yarn Application State: " + appReport.getYarnApplicationState()); + } + } + + + /** + * + * @param options + * @param sliderClient + * @param appStatusBuilder + * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible + * @throws LlapStatusCliException + */ + private ExitCode populateAppStatusFromSlider(LlapStatusOptions options, SliderClient sliderClient, AppStatusBuilder appStatusBuilder) throws + LlapStatusCliException { + + ClusterDescription clusterDescription; + try { + clusterDescription = sliderClient.getClusterDescription(options.getName()); + } catch (SliderException e) { + throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_OTHER, + "Failed to get cluster description from slider. SliderErrorCode=" + (e).getExitCode(), e); + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_OTHER, + "Failed to get cluster description from slider", e); + } + + if (clusterDescription == null) { + LOG.info("Slider ClusterDescription not available"); + return ExitCode.SLIDER_CLIENT_ERROR_OTHER; // ClusterDescription should always be present. + } else { + // Process the Cluster Status returned by slider. + appStatusBuilder.setOriginalConfigurationPath(clusterDescription.originConfigurationPath); + appStatusBuilder.setGeneratedConfigurationPath(clusterDescription.generatedConfigurationPath); + appStatusBuilder.setAppStartTime(clusterDescription.createTime); + + // Finish populating AMInfo + appStatusBuilder.maybeCreateAndGetAmInfo().setAmWebUrl(clusterDescription.getInfo(StatusKeys.INFO_AM_WEB_URL)); + appStatusBuilder.maybeCreateAndGetAmInfo().setHostname(clusterDescription.getInfo(StatusKeys.INFO_AM_HOSTNAME)); + appStatusBuilder.maybeCreateAndGetAmInfo().setContainerId(clusterDescription.getInfo(StatusKeys.INFO_AM_CONTAINER_ID)); + + + if (clusterDescription.statistics != null) { + Map llapStats = clusterDescription.statistics.get(LLAP_KEY); + if (llapStats != null) { + int desiredContainers = llapStats.get(StatusKeys.STATISTICS_CONTAINERS_DESIRED); + int liveContainers = llapStats.get(StatusKeys.STATISTICS_CONTAINERS_LIVE); + appStatusBuilder.setDesiredInstances(desiredContainers); + appStatusBuilder.setLiveInstances(liveContainers); + } else { + throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_OTHER, + "Failed to get statistics for LLAP"); // Error since LLAP should always exist. + } + // TODO Use some information from here such as containers.start.failed + // and containers.failed.recently to provide an estimate of whether this app is healthy or not. + } else { + throw new LlapStatusCliException(ExitCode.SLIDER_CLIENT_ERROR_OTHER, + "Failed to get statistics"); // Error since statistics should always exist. + } + + // Code to locate container status via slider. Not using this at the moment. + if (clusterDescription.status != null) { + Object liveObject = clusterDescription.status.get(ClusterDescriptionKeys.KEY_CLUSTER_LIVE); + if (liveObject != null) { + Map>> liveEntity = + (Map>>) liveObject; + Map> llapEntity = liveEntity.get(LLAP_KEY); + + if (llapEntity != null) { // Not a problem. Nothing has come up yet. + for (Map.Entry> containerEntry : llapEntity.entrySet()) { + String containerIdString = containerEntry.getKey(); + Map containerParams = containerEntry.getValue(); + + String host = (String) containerParams.get("host"); +// String nmUrl = (String) containerParams.get("hostUrl"); + + LlapInstance llapInstance = new LlapInstance(host, containerIdString); + + appStatusBuilder.addNewLlapInstance(llapInstance); + } + } + + } + } + + return ExitCode.SUCCESS; + + } + } + + + /** + * + * @param options + * @param appStatusBuilder + * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible + * @throws LlapStatusCliException + */ + private ExitCode populateAppStatusFromLlapRegistry(LlapStatusOptions options, AppStatusBuilder appStatusBuilder) throws + LlapStatusCliException { + Configuration llapRegistryConf= new Configuration(conf); + llapRegistryConf + .set(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName()); + LlapRegistryService llapRegistry; + try { + llapRegistry = LlapRegistryService.getClient(llapRegistryConf); + } catch (Exception e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, + "Failed to create llap registry client", e); + } + try { + Map serviceInstanceMap; + try { + serviceInstanceMap = llapRegistry.getInstances().getAll(); + } catch (IOException e) { + throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e); + } + + if (serviceInstanceMap == null || serviceInstanceMap.isEmpty()) { + LOG.info("No information found in the LLAP registry"); + appStatusBuilder.setLiveInstances(0); + appStatusBuilder.setState(State.LAUNCHING); + appStatusBuilder.clearLlapInstances(); + return ExitCode.SUCCESS; + } else { + + + // Tracks instances known by both slider and llap. + List validatedInstances = new LinkedList<>(); + List llapExtraInstances = new LinkedList<>(); + + for (Map.Entry serviceInstanceEntry : serviceInstanceMap + .entrySet()) { + + ServiceInstance serviceInstance = serviceInstanceEntry.getValue(); + String containerIdString = serviceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + + + LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(containerIdString); + if (llapInstance != null) { + llapInstance.setMgmtPort(serviceInstance.getManagementPort()); + llapInstance.setRpcPort(serviceInstance.getRpcPort()); + llapInstance.setShufflePort(serviceInstance.getShufflePort()); + llapInstance.setWebUrl(serviceInstance.getServicesAddress()); + llapInstance.setStatusUrl(serviceInstance.getServicesAddress() + "/status"); + validatedInstances.add(llapInstance); + } else { + // This likely indicates that an instance has recently restarted + // (the old instance has not been unregistered), and the new instances has not registered yet. + llapExtraInstances.add(llapInstance); + // This instance will not be added back, since it's services are not up yet. + } + + } + + appStatusBuilder.setLiveInstances(validatedInstances.size()); + if (validatedInstances.size() >= appStatusBuilder.getDesiredInstances()) { + appStatusBuilder.setState(State.RUNNING_ALL); + if (validatedInstances.size() > appStatusBuilder.getDesiredInstances()) { + LOG.warn("Found more entries in LLAP registry, as compared to desired entries"); + } + } else { + appStatusBuilder.setState(State.RUNNING_PARTIAL); + } + + // At this point, everything that can be consumed from AppStatusBuilder has been consumed. + // Debug only + if (appStatusBuilder.allInstances().size() > 0) { + // Containers likely to come up soon. + LOG.debug("Potential instances starting up: {}", appStatusBuilder.allInstances()); + } + if (llapExtraInstances.size() > 0) { + // Old containers which are likely shutting down + LOG.debug("Instances likely to shutdown soon: {}", llapExtraInstances); + } + + appStatusBuilder.clearAndAddPreviouslyKnownInstances(validatedInstances); + + } + return ExitCode.SUCCESS; + } finally { + llapRegistry.stop(); + } + + } + + + static final class AppStatusBuilder { + + private AmInfo amInfo; + private State state = State.UNKNOWN; + private String originalConfigurationPath; + private String generatedConfigurationPath; + + private Integer desiredInstances; + private Integer liveInstances; + + private Long appStartTime; + private Long appFinishTime; + + private List llapInstances = new LinkedList<>(); + + private transient Map containerToInstanceMap = new HashMap<>(); + + public void setAmInfo(AmInfo amInfo) { + this.amInfo = amInfo; + } + + public AppStatusBuilder setState( + State state) { + this.state = state; + return this; + } + + public AppStatusBuilder setOriginalConfigurationPath(String originalConfigurationPath) { + this.originalConfigurationPath = originalConfigurationPath; + return this; + } + + public AppStatusBuilder setGeneratedConfigurationPath(String generatedConfigurationPath) { + this.generatedConfigurationPath = generatedConfigurationPath; + return this; + } + + public AppStatusBuilder setAppStartTime(long appStartTime) { + this.appStartTime = appStartTime; + return this; + } + + public AppStatusBuilder setAppFinishTime(long finishTime) { + this.appFinishTime = finishTime; + return this; + } + + public AppStatusBuilder setDesiredInstances(int desiredInstances) { + this.desiredInstances = desiredInstances; + return this; + } + + public AppStatusBuilder setLiveInstances(int liveInstances) { + this.liveInstances = liveInstances; + return this; + } + + public AppStatusBuilder addNewLlapInstance(LlapInstance llapInstance) { + this.llapInstances.add(llapInstance); + this.containerToInstanceMap.put(llapInstance.getContainerId(), llapInstance); + return this; + } + + public LlapInstance removeAndgetLlapInstanceForContainer(String containerIdString) { + return containerToInstanceMap.remove(containerIdString); + } + + public void clearLlapInstances() { + this.llapInstances.clear(); + this.containerToInstanceMap.clear(); + } + + public AppStatusBuilder clearAndAddPreviouslyKnownInstances(List llapInstances) { + clearLlapInstances(); + for (LlapInstance llapInstance : llapInstances) { + addNewLlapInstance(llapInstance); + } + return this; + } + + @JsonIgnore + public List allInstances() { + return this.llapInstances; + } + + public AmInfo getAmInfo() { + return amInfo; + } + + public State getState() { + return state; + } + + public String getOriginalConfigurationPath() { + return originalConfigurationPath; + } + + public String getGeneratedConfigurationPath() { + return generatedConfigurationPath; + } + + public Integer getDesiredInstances() { + return desiredInstances; + } + + public Integer getLiveInstances() { + return liveInstances; + } + + public Long getAppStartTime() { + return appStartTime; + } + + public Long getAppFinishTime() { + return appFinishTime; + } + + public List getLlapInstances() { + return llapInstances; + } + + @JsonIgnore + public AmInfo maybeCreateAndGetAmInfo() { + if (amInfo == null) { + amInfo = new AmInfo(); + } + return amInfo; + } + + @Override + public String toString() { + return "AppStatusBuilder{" + + "amInfo=" + amInfo + + ", state=" + state + + ", originalConfigurationPath='" + originalConfigurationPath + '\'' + + ", generatedConfigurationPath='" + generatedConfigurationPath + '\'' + + ", desiredInstances=" + desiredInstances + + ", liveInstances=" + liveInstances + + ", appStartTime=" + appStartTime + + ", appFinishTime=" + appFinishTime + + ", llapInstances=" + llapInstances + + ", containerToInstanceMap=" + containerToInstanceMap + + '}'; + } + } + + static class AmInfo { + private String appName; + private String appType; + private String appId; + private String containerId; + private String hostname; + private String amWebUrl; + + public AmInfo setAppName(String appName) { + this.appName = appName; + return this; + } + + public AmInfo setAppType(String appType) { + this.appType = appType; + return this; + } + + public AmInfo setAppId(String appId) { + this.appId = appId; + return this; + } + + public AmInfo setContainerId(String containerId) { + this.containerId = containerId; + return this; + } + + public AmInfo setHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public AmInfo setAmWebUrl(String amWebUrl) { + this.amWebUrl = amWebUrl; + return this; + } + + public String getAppName() { + return appName; + } + + public String getAppType() { + return appType; + } + + public String getAppId() { + return appId; + } + + public String getContainerId() { + return containerId; + } + + public String getHostname() { + return hostname; + } + + public String getAmWebUrl() { + return amWebUrl; + } + + @Override + public String toString() { + return "AmInfo{" + + "appName='" + appName + '\'' + + ", appType='" + appType + '\'' + + ", appId='" + appId + '\'' + + ", containerId='" + containerId + '\'' + + ", hostname='" + hostname + '\'' + + ", amWebUrl='" + amWebUrl + '\'' + + '}'; + } + } + + static class LlapInstance { + private final String hostname; + private final String containerId; + private String statusUrl; + private String webUrl; + private Integer rpcPort; + private Integer mgmtPort; + private Integer shufflePort; + + // TODO Add additional information such as #executors, container size, etc + + public LlapInstance(String hostname, String containerId) { + this.hostname = hostname; + this.containerId = containerId; + } + + public LlapInstance setWebUrl(String webUrl) { + this.webUrl = webUrl; + return this; + } + + public LlapInstance setStatusUrl(String statusUrl) { + this.statusUrl = statusUrl; + return this; + } + + public LlapInstance setRpcPort(int rpcPort) { + this.rpcPort = rpcPort; + return this; + } + + public LlapInstance setMgmtPort(int mgmtPort) { + this.mgmtPort = mgmtPort; + return this; + } + + public LlapInstance setShufflePort(int shufflePort) { + this.shufflePort = shufflePort; + return this; + } + + public String getHostname() { + return hostname; + } + + public String getStatusUrl() { + return statusUrl; + } + + public String getContainerId() { + return containerId; + } + + public String getWebUrl() { + return webUrl; + } + + public Integer getRpcPort() { + return rpcPort; + } + + public Integer getMgmtPort() { + return mgmtPort; + } + + public Integer getShufflePort() { + return shufflePort; + } + + @Override + public String toString() { + return "LlapInstance{" + + "hostname='" + hostname + '\'' + + ", containerId='" + containerId + '\'' + + ", statusUrl='" + statusUrl + '\'' + + ", webUrl='" + webUrl + '\'' + + ", rpcPort=" + rpcPort + + ", mgmtPort=" + mgmtPort + + ", shufflePort=" + shufflePort + + '}'; + } + } + + static class LlapStatusCliException extends Exception { + final ExitCode exitCode; + + + public LlapStatusCliException(ExitCode exitCode, String message) { + super(exitCode.getInt() +": " + message); + this.exitCode = exitCode; + } + + public LlapStatusCliException(ExitCode exitCode, String message, Throwable cause) { + super(message, cause); + this.exitCode = exitCode; + } + + public ExitCode getExitCode() { + return exitCode; + } + } + + enum State { + APP_NOT_FOUND, LAUNCHING, + RUNNING_PARTIAL, + RUNNING_ALL, COMPLETE, UNKNOWN + } + + enum ExitCode { + SUCCESS(0), + INCORRECT_USAGE(10), + YARN_ERROR(20), + SLIDER_CLIENT_ERROR_CREATE_FAILED(30), + SLIDER_CLIENT_ERROR_OTHER(31), + LLAP_REGISTRY_ERROR(40), + LLAP_JSON_GENERATION_ERROR(50), + // Error in the script itself - likely caused by an incompatible change, or new functionality / states added. + INTERNAL_ERROR(100); + + private final int exitCode; + + ExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public int getInt() { + return exitCode; + } + } + + + private static void logError(Throwable t) { + LOG.error("FAILED: " + t.getMessage(), t); + System.err.println("FAILED: " + t.getMessage()); + } + + + public static void main(String[] args) { + int ret; + try { + LlapStatusServiceDriver statusServiceDriver = new LlapStatusServiceDriver(); + ret = statusServiceDriver.run(args); + if (ret == ExitCode.SUCCESS.getInt()) { + statusServiceDriver.outputJson(); + } + + } catch (Throwable t) { + logError(t); + ret = ExitCode.INTERNAL_ERROR.getInt(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Completed processing - exiting with " + ret); + } + System.exit(ret); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 2fe59a2..e61d74b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -59,6 +59,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.logging.log4j.core.config.Configurator; import org.slf4j.Logger; @@ -353,6 +354,14 @@ public static void main(String[] args) throws Exception { // Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml // Ideally, these properties should be part of LlapDameonConf rather than HiveConf LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); + + String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()); + if (containerIdStr != null && !containerIdStr.isEmpty()) { + daemonConf.set(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname, containerIdStr); + } else { + daemonConf.unset(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + } + int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); String localDirList = LlapUtil.getDaemonLocalDirList(daemonConf); diff --git pom.xml pom.xml index 2337e89..77cfaeb 100644 --- pom.xml +++ pom.xml @@ -170,6 +170,7 @@ 1.7.5 4.0.4 0.8.2 + 0.90.2-incubating 2.2.0 1.6.0 2.10