diff --git llap-server/pom.xml llap-server/pom.xml index 7b17f17..22ed693 100644 --- llap-server/pom.xml +++ llap-server/pom.xml @@ -165,6 +165,12 @@ test + org.apache.hadoop + hadoop-yarn-registry + ${hadoop-23.version} + true + + org.apache.tez tez-runtime-internals ${tez.version} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java index 2f405d9..0554c32 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java @@ -29,7 +29,7 @@ public LlapDaemonConfiguration() { } - private static final String LLAP_DAEMON_PREFIX = "llap.daemon."; + public static final String LLAP_DAEMON_PREFIX = "llap.daemon."; private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml"; @@ -40,6 +40,7 @@ public LlapDaemonConfiguration() { public static final String LLAP_DAEMON_WORK_DIRS = LLAP_DAEMON_PREFIX + "work.dirs"; public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port"; + public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551; // Section for configs used in AM and executors diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 2bf2ed9..0ff255c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; @@ -49,6 +50,7 @@ private final Configuration shuffleHandlerConf; private final LlapDaemonProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; + private final LlapRegistryService registry; private final AtomicLong numSubmissions = new AtomicLong(0); private JvmPauseMonitor pauseMonitor; private final ObjectName llapDaemonInfoBean; @@ -128,6 +130,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, shufflePort, address, executorMemoryBytes, metrics); + + this.registry = new LlapRegistryService(); } private void printAsciiArt() { @@ -148,6 +152,7 @@ private void printAsciiArt() { public void serviceInit(Configuration conf) { server.init(conf); containerRunner.init(conf); + registry.init(conf); LlapIoProxy.setDaemon(true); LlapIoProxy.initializeLlapIo(conf); } @@ -157,6 +162,8 @@ public void serviceStart() throws Exception { ShuffleHandler.initializeAndStart(shuffleHandlerConf); server.start(); containerRunner.start(); + registry.start(); + registry.registerWorker(); } public void serviceStop() throws Exception { @@ -164,6 +171,8 @@ public void serviceStop() throws Exception { shutdown(); containerRunner.stop(); server.stop(); + registry.unregisterWorker(); + registry.stop(); ShuffleHandler.shutdown(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java new file mode 100644 index 0000000..33cd7eb --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; + +public class LlapRegistryService extends AbstractService { + + private static final Logger LOG = Logger.getLogger(LlapRegistryService.class); + + public final static String SERVICE_CLASS = "org-apache-hive"; + + private RegistryOperationsService client; + private String instanceName; + private Configuration conf; + private ServiceRecordMarshal encoder; + + private static final String hostname; + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + public LlapRegistryService() { + super("LlapRegistryService"); + } + + @Override + public void serviceInit(Configuration conf) { + String registryId = conf.getTrimmed(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (registryId.startsWith("@")) { + LOG.info("Llap Registry is enabled with registryid: " + registryId); + this.conf = new Configuration(conf); + conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // registry reference + instanceName = registryId.substring(1); + client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); + encoder = new RegistryUtils.ServiceRecordMarshal(); + + } else { + LOG.info("Llap Registry is disabled"); + } + } + + @Override + public void serviceStart() throws Exception { + if (client != null) { + client.start(); + } + } + + public void serviceStop() throws Exception { + if (client != null) { + client.stop(); + } + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = + conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + + return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = + conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + private final String getPath() { + return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); + } + + public void registerWorker() throws IOException { + if (this.client != null) { + String path = getPath(); + ServiceRecord srv = new ServiceRecord(); + srv.addInternalEndpoint(getRpcEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + + for (Map.Entry kv : this.conf) { + if (kv.getKey().startsWith(LlapDaemonConfiguration.LLAP_DAEMON_PREFIX)) { + // TODO: read this somewhere useful, like the allocator + srv.set(kv.getKey(), kv.getValue()); + } + } + + client.mknode(RegistryPathUtils.parentOf(path), true); + + // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths + client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), + client.getClientAcls()); + } + } + + public void unregisterWorker() { + if (this.client != null) { + // with ephemeral nodes, there's nothing to do here + // because the create didn't return paths + } + } + + public Map getWorkers() throws IOException { + if (this.client != null) { + String path = getPath(); + return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); + } else { + Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); + return null; + } + } +} diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 0af5ea4..fc8df6b 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -28,12 +29,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -46,7 +52,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; @@ -78,8 +86,7 @@ // Per Executor Thread private final Resource resourcePerExecutor; - // TODO: replace with service registry - private final YarnClient yc = YarnClient.createYarnClient(); + private final LlapRegistryService registry = new LlapRegistryService(); public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, @@ -109,17 +116,17 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); - String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - if (hosts == null || hosts.length == 0) { - hosts = new String[]{"localhost"}; - serviceHosts.add("localhost"); - serviceHostSet.add("localhost"); - } else if (!hosts[0].equals("*")) { - for (String host : hosts) { - serviceHosts.add(host); - serviceHostSet.add(host); + String instanceId = conf.getTrimmed(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + + if (instanceId == null || false == instanceId.startsWith("@")) { + String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts == null || hosts.length == 0) { + hosts = new String[] { "localhost" }; + serviceHosts.add("localhost"); + serviceHostSet.add("localhost"); } } + this.containerPort = conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); @@ -145,27 +152,30 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a @Override public void serviceInit(Configuration conf) { - yc.init(conf); + registry.init(conf); } @Override public void serviceStart() { - yc.start(); + registry.start(); if (serviceHosts.size() > 0) { return; } - LOG.info("Evaluating host usage criteria for service nodes"); + LOG.info("Reading YARN registry for service records"); try { - List nodes = yc.getNodeReports(NodeState.RUNNING); - for (NodeReport nd : nodes) { - Resource used = nd.getUsed(); - LOG.info("Examining node: " + nd); - if (nd.getNodeState() == NodeState.RUNNING - && used.getMemory() >= memoryPerInstance) { - // TODO: fix this with YARN registry - serviceHosts.add(nd.getNodeId().getHost()); - serviceHostSet.add(nd.getNodeId().getHost()); + Map workers = registry.getWorkers(); + for (ServiceRecord srv : workers.values()) { + Endpoint rpc = srv.getInternalEndpoint("llap"); + if (rpc != null) { + LOG.info("Examining endpoint: " + rpc); + final String host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + serviceHosts.add(host); + serviceHostSet.add(host); + } else { + LOG.info("The SRV record was " + srv); } } LOG.info("Re-inited with configuration: " + @@ -174,10 +184,8 @@ public void serviceStart() { ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + ", hosts="+ serviceHosts.toString()); - } catch (IOException e) { - e.printStackTrace(); - } catch (YarnException e) { - e.printStackTrace(); + } catch (IOException ioe) { + throw new TezUncheckedException(ioe); } } diff --git llap-server/src/main/resources/llap.py llap-server/src/main/resources/llap.py index 65c0e9a..fa3bb48 100644 --- llap-server/src/main/resources/llap.py +++ llap-server/src/main/resources/llap.py @@ -44,6 +44,7 @@ def start(self, env): # location containing llap-daemon-site.xml, tez and yarn configuration xmls as well. os.environ['LLAP_DAEMON_CONF_DIR'] = format("{app_root}/conf/") os.environ['LLAP_DAEMON_LOG_DIR'] = format("{app_log_dir}/") + os.environ['LLAP_DAEMON_LOGGER'] = format("{app_log_level}") os.environ['LLAP_DAEMON_HEAPSIZE'] = format("{memory_val}") os.environ['LLAP_DAEMON_PID_DIR'] = dirname(format("{pid_file}")) os.environ['LLAP_DAEMON_LD_PATH'] = format('{library_path}') diff --git llap-server/src/main/resources/params.py llap-server/src/main/resources/params.py index 8706d04..8aa8e3c 100644 --- llap-server/src/main/resources/params.py +++ llap-server/src/main/resources/params.py @@ -29,6 +29,7 @@ additional_cp = config['configurations']['global']['additional_cp'] app_log_dir = config['configurations']['global']['app_log_dir'] +app_log_level = config['configurations']['global']['app_log_level'] port = config['configurations']['global']['listen_port'] memory_val = config['configurations']['global']['memory_val'] diff --git llap-server/src/main/resources/templates.py llap-server/src/main/resources/templates.py index 382ef17..e3599c0 100644 --- llap-server/src/main/resources/templates.py +++ llap-server/src/main/resources/templates.py @@ -73,6 +73,7 @@ "java_home": "%(java_home)s", "site.global.app_user": "yarn", "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/", + "site.global.app_log_level": "INFO,RFA", "site.global.additional_cp": "%(hadoop_home)s", "site.global.library_path": "%(hadoop_home)s/lib/native", "site.global.memory_val": "%(heap)d", diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 1f9ff94..2fc76cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -179,7 +179,7 @@ public void open(HiveConf conf, String[] additionalFiles) commonLocalResources.put(utils.getBaseName(lr), lr); } - if("llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { + if ("llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { // add configs for llap-daemon-site.xml + localize llap jars // they cannot be referred to directly as it would be a circular depedency conf.addResource("llap-daemon-site.xml"); @@ -193,6 +193,17 @@ public void open(HiveConf conf, String[] additionalFiles) } catch (ClassNotFoundException ce) { throw new IOException("Cannot find LlapInputFormat in the classpath", ce); } + + try { + final File registryJar = + new File(Utilities.jarFinderGetJar(Class + .forName("org.apache.hadoop.registry.client.api.RegistryOperations"))); + final LocalResource registryLr = + createJarLocalResource(registryJar.toURI().toURL().toExternalForm()); + commonLocalResources.put(utils.getBaseName(registryLr), registryLr); + } catch (ClassNotFoundException ce) { + throw new IOException("Cannot find Hadoop Registry in the classpath", ce); + } } // Create environment for AM. @@ -402,7 +413,6 @@ private LocalResource createJarLocalResource(String localJarPath) return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); } - private String getSha(Path localFile) throws IOException, IllegalArgumentException { InputStream is = null; try {