diff --git common/src/java/org/apache/hive/http/HttpServer.java common/src/java/org/apache/hive/http/HttpServer.java index b8836de..0aa9c89 100644 --- common/src/java/org/apache/hive/http/HttpServer.java +++ common/src/java/org/apache/hive/http/HttpServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Logger; @@ -210,7 +211,8 @@ public int getPort() { * @param response the servlet response. * @return TRUE/FALSE based on the logic described above. */ - static boolean isInstrumentationAccessAllowed( + @InterfaceAudience.LimitedPrivate("hive") + public static boolean isInstrumentationAccessAllowed( ServletContext servletContext, HttpServletRequest request, HttpServletResponse response) throws IOException { Configuration conf = diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index f94a837..4938c07 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -37,9 +37,11 @@ /** * Register the current instance - the implementation takes care of the endpoints to register. * + * @return self identifying name + * * @throws IOException */ - public void register() throws IOException; + public String register() throws IOException; /** * Remove the current registration cleanly (implementation defined cleanup) diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 8cace8f..3f667d0 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -89,8 +89,9 @@ public void stop() throws IOException { } @Override - public void register() throws IOException { - // nothing to register + public String register() throws IOException { + // nothing to register (return host-) + return getWorkerIdentity(InetAddress.getLocalHost().getCanonicalHostName()); } @Override diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 5917156..2b4516b 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -18,6 +18,7 @@ import java.util.Map; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -34,6 +35,8 @@ private ServiceRegistry registry = null; private final boolean isDaemon; + private boolean isDynamic = false; + private String identity = "(pending)"; private static final Map yarnRegistries = new HashMap<>(); @@ -79,8 +82,10 @@ public void serviceInit(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); if (hosts.startsWith("@")) { registry = new LlapZookeeperRegistryImpl(hosts.substring(1), conf); + this.isDynamic=true; } else { registry = new LlapFixedRegistryImpl(hosts, conf); + this.isDynamic=false; } LOG.info("Using LLAP registry type " + registry); } @@ -110,7 +115,7 @@ public void serviceStop() throws Exception { private void registerWorker() throws IOException { if (this.registry != null) { - this.registry.register(); + this.identity = this.registry.register(); } } @@ -128,4 +133,14 @@ public void registerStateChangeListener(ServiceInstanceStateChangeListener liste throws IOException { this.registry.registerStateChangeListener(listener); } + + // is the registry dynamic (i.e refreshes?) + public boolean isDynamic() { + return isDynamic; + } + + // this is only useful for the daemons to know themselves + public String getWorkerIdentity() { + return identity; + } } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index c611d1a..287e75e 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -242,7 +242,7 @@ public Endpoint getMngEndpoint() { } @Override - public void register() throws IOException { + public String register() throws IOException { ServiceRecord srv = new ServiceRecord(); Endpoint rpcEndpoint = getRpcEndpoint(); srv.addInternalEndpoint(rpcEndpoint); @@ -295,6 +295,7 @@ public void register() throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Created zknode with path: {} service record: {}", znodePath, srv); } + return uniq.toString(); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c8734a5..38612c5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -211,7 +211,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) { this.webServices = null; } else { - this.webServices = new LlapWebServices(); + this.webServices = new LlapWebServices(this, registry); addIfService(webServices); } // Bring up the server only after all other components have started. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index afb59c0..e945364 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -18,14 +18,31 @@ package org.apache.hadoop.hive.llap.daemon.services.impl; import java.io.IOException; +import java.io.PrintWriter; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; + +import javax.management.MalformedObjectNameException; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.http.HttpServer; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +50,25 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapWebServices.class); + // this is what allows the UI to do cross-domain reads of the contents + // only apply to idempotent GET ops (all others need crumbs) + static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; + static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + + static final String REGISTRY_ATTRIBUTE="llap.registry"; + static final String PARENT_ATTRIBUTE="llap.parent"; + private int port; private HttpServer http; private boolean useSSL = false; private boolean useSPNEGO = false; + private final CompositeService parent; + private final LlapRegistryService registry; - public LlapWebServices() { + public LlapWebServices(CompositeService parent, LlapRegistryService registry) { super("LlapWebServices"); + this.registry = registry; + this.parent = parent; } @Override @@ -62,8 +91,13 @@ public void serviceInit(Configuration conf) { } } + builder.setContextAttribute(REGISTRY_ATTRIBUTE, registry); + builder.setContextAttribute(PARENT_ATTRIBUTE, parent); + try { this.http = builder.build(); + this.http.addServlet("status", "/status", LlapStatusServlet.class); + this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class); } catch (IOException e) { LOG.warn("LLAP web service failed to come up", e); } @@ -81,4 +115,142 @@ public void serviceStop() throws Exception { this.http.stop(); } } + + public static class LlapStatusServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + private static final String STATUS_ATTRIBUTE = "status"; + private static final String UPTIME_ATTRIBUTE = "uptime"; + private static final String BUILD_ATTRIBUTE = "build"; + + private static final String UNKNOWN_STATE = "UNKNOWN"; + + protected transient JsonFactory jsonFactory; + + protected RuntimeMXBean runtimeBean; + + @Override + public void init() throws ServletException { + jsonFactory = new JsonFactory(); + runtimeBean = ManagementFactory.getRuntimeMXBean(); + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) { + JsonGenerator jg = null; + PrintWriter writer = null; + final ServletContext context = getServletContext(); + final Object parent = context.getAttribute(PARENT_ATTRIBUTE); + + final long uptime = runtimeBean.getUptime(); + + try { + // admin check + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + return; + } + try { + writer = response.getWriter(); + + response.setContentType("application/json; charset=utf8"); + response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET"); + response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + jg = jsonFactory.createJsonGenerator(writer); + jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + jg.useDefaultPrettyPrinter(); + jg.writeStartObject(); + if (parent != null && parent instanceof CompositeService) { + jg.writeStringField(STATUS_ATTRIBUTE, ((CompositeService) parent).getServiceState() + .toString()); + } else { + jg.writeStringField(STATUS_ATTRIBUTE, UNKNOWN_STATE); + } + jg.writeNumberField(UPTIME_ATTRIBUTE, uptime); + jg.writeStringField(BUILD_ATTRIBUTE, HiveVersionInfo.getBuildVersion()); + jg.writeEndObject(); + } finally { + if (jg != null) { + jg.close(); + } + if (writer != null) { + writer.close(); + } + } + } catch (IOException e) { + LOG.error("Caught an exception while processing /status request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + } + + public static class LlapPeerRegistryServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + protected transient JsonFactory jsonFactory; + + @Override + public void init() throws ServletException { + jsonFactory = new JsonFactory(); + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) { + JsonGenerator jg = null; + PrintWriter writer = null; + final ServletContext context = getServletContext(); + final LlapRegistryService registry = (LlapRegistryService)context.getAttribute(REGISTRY_ATTRIBUTE); + + try { + // admin check + if (!HttpServer.isInstrumentationAccessAllowed(context, request, response)) { + return; + } + try { + writer = response.getWriter(); + + response.setContentType("application/json; charset=utf8"); + response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET"); + response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + jg = jsonFactory.createJsonGenerator(writer); + jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + jg.useDefaultPrettyPrinter(); + jg.writeStartObject(); + if (registry.isDynamic()) { + jg.writeBooleanField("dynamic", true); + } + jg.writeStringField("identity", registry.getWorkerIdentity()); + jg.writeArrayFieldStart("peers"); + for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered()) { + jg.writeStartObject(); + jg.writeStringField("identity", s.getWorkerIdentity()); + jg.writeStringField("host", s.getHost()); + jg.writeNumberField("management-port", s.getManagementPort()); + jg.writeNumberField("rpc-port", s.getRpcPort()); + jg.writeNumberField("shuffle-port", s.getShufflePort()); + Resource r = s.getResource(); + if (r != null) { + jg.writeObjectFieldStart("resource"); + jg.writeNumberField("vcores", r.getVirtualCores()); + jg.writeNumberField("memory", r.getMemory()); + jg.writeEndObject(); + } + jg.writeStringField("host", s.getHost()); + jg.writeEndObject(); + } + jg.writeEndArray(); + jg.writeEndObject(); + } finally { + if (jg != null) { + jg.close(); + } + if (writer != null) { + writer.close(); + } + } + } catch (IOException e) { + LOG.error("Caught an exception while processing /status request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + } + }