entry : second.entrySet()) {
+ T1 key = entry.getKey();
+ if (!first.containsKey(key)) {
+ first.put(key, entry.getValue());
+ }
+ }
+ return first;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Error.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Error.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Error.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Error.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementConstraint.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementPolicy.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementScope.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/PlacementType.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
similarity index 98%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
index 6e031219ba9..c94734bd063 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
@@ -42,7 +42,7 @@
**/
@InterfaceAudience.Public
@InterfaceStability.Unstable
-@ApiModel(description = "An Service resource has the following attributes.")
+@ApiModel(description = "A Service resource has the following attributes.")
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@JsonInclude(JsonInclude.Include.NON_NULL)
@@ -413,12 +413,13 @@ public boolean equals(java.lang.Object o) {
return false;
}
Service service = (Service) o;
- return Objects.equals(this.name, service.name);
+ return Objects.equals(this.name, service.name) && Objects.equals(this
+ .version, service.version);
}
@Override
public int hashCode() {
- return Objects.hash(name);
+ return Objects.hash(name, version);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
similarity index 100%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceStatus.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Services.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Services.java
new file mode 100644
index 00000000000..ca9a048b66e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/api/records/Services.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.api.records;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.swagger.annotations.ApiModel;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A list of Services.
+ **/
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "A list of Services.")
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class Services {
+ List services = new ArrayList<>();
+
+ public Services serviceList(Service... serviceList) {
+ for (Service service : serviceList) {
+ this.services.add(service);
+ }
+ return this;
+ }
+
+ public List getServices() {
+ return services;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 84b3915a0c2..113e6a7c668 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -137,4 +138,8 @@
* @return the NM {@code DeletionService}.
*/
DeletionService getDeletionService();
+
+ void setAuxServices(AuxServices auxServices);
+
+ AuxServices getAuxServices();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 6eda4a80b77..e9f4e500894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -48,10 +48,12 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -684,6 +686,8 @@ protected void reregisterCollectors() {
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
+ private AuxServices auxServices;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -934,6 +938,16 @@ public void setNMLogAggregationStatusTracker(
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
return nmLogAggregationStatusTracker;
}
+
+ @Override
+ public void setAuxServices(AuxServices auxServices) {
+ this.auxServices = auxServices;
+ }
+
+ @Override
+ public AuxServices getAuxServices() {
+ return this.auxServices;
+ }
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
index 77c4dd9a6cb..11468f66989 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
@@ -18,15 +18,34 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,54 +88,83 @@
public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
new FsPermission((short) 0700);
+ public static final String CLASS_NAME = "class.name";
+ public static final String SYSTEM_CLASSES = "system.classes";
+
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
private static final Logger LOG =
LoggerFactory.getLogger(AuxServices.class);
private static final String DEL_SUFFIX = "_DEL_";
- protected final Map serviceMap;
- protected final Map serviceMetaData;
+ protected final Map serviceMap;
+ protected final Map serviceRecordMap;
+ protected final Map serviceMetaData;
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
private final LocalDirsHandlerService dirsHandler;
private final DeletionService delService;
private final UserGroupInformation userUGI;
+ private final FsPermission storeDirPerms = new FsPermission((short)0700);
+ private Path stateStoreRoot = null;
+ private FileSystem stateStoreFs = null;
+
+ private Path manifest;
+ private FileSystem manifestFS;
+ private Timer manifestReloadTimer;
+ private TimerTask manifestReloadTask;
+ private long manifestReloadInterval;
+ private long manifestModifyTS = -1;
+
+ private final ObjectMapper mapper;
+
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
- public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
+ AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
Context nmContext, DeletionService deletionService) {
super(AuxServices.class.getName());
serviceMap =
- Collections.synchronizedMap(new HashMap());
+ Collections.synchronizedMap(new HashMap());
+ serviceRecordMap =
+ Collections.synchronizedMap(new HashMap());
serviceMetaData =
Collections.synchronizedMap(new HashMap());
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
this.dirsHandler = nmContext.getLocalDirsHandler();
this.delService = deletionService;
this.userUGI = getRemoteUgi();
+ this.mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Obtain services from configuration in init()
}
protected final synchronized void addService(String name,
- AuxiliaryService service) {
- LOG.info("Adding auxiliary service " +
- service.getName() + ", \"" + name + "\"");
+ AuxiliaryService service, org.apache.hadoop.yarn.service.api.records
+ .Service serviceRecord) {
+ LOG.info("Adding auxiliary service " + serviceRecord.getName() +
+ " version " + serviceRecord.getVersion());
serviceMap.put(name, service);
+ serviceRecordMap.put(name, serviceRecord);
}
Collection getServices() {
return Collections.unmodifiableCollection(serviceMap.values());
}
+ public Collection
+ getServiceRecords() {
+ return Collections.unmodifiableCollection(serviceRecordMap.values());
+ }
+
/**
* @return the meta data for all registered services, that have been started.
* If a service has not been started no metadata will be available. The key
* is the name of the service as defined in the configuration.
*/
public Map getMetaData() {
- Map metaClone = new HashMap(
- serviceMetaData.size());
+ Map metaClone = new HashMap<>(serviceMetaData.size());
synchronized (serviceMetaData) {
for (Entry entry : serviceMetaData.entrySet()) {
metaClone.put(entry.getKey(), entry.getValue().duplicate());
@@ -125,11 +173,371 @@ protected final synchronized void addService(String name,
return metaClone;
}
+ private AuxiliaryService createAuxServiceFromConfiguration(
+ org.apache.hadoop.yarn.service.api.records.Service service) {
+ Configuration c = new Configuration(false);
+ c.set(CLASS_NAME, getClassName(service));
+ Class extends AuxiliaryService> sClass = c.getClass(CLASS_NAME,
+ null, AuxiliaryService.class);
+
+ if (sClass == null) {
+ throw new YarnRuntimeException("No class defined for auxiliary " +
+ "service" + service.getName());
+ }
+ return ReflectionUtils.newInstance(sClass, null);
+ }
+
+ private AuxiliaryService createAuxServiceFromLocalClasspath(
+ org.apache.hadoop.yarn.service.api.records.Service service, String
+ appLocalClassPath, Configuration conf) throws IOException,
+ ClassNotFoundException {
+ Preconditions.checkArgument(appLocalClassPath != null &&
+ !appLocalClassPath.isEmpty(),
+ "local classpath was null in createAuxServiceFromLocalClasspath");
+ final String sName = service.getName();
+ final String className = getClassName(service);
+
+ if (service.getConfiguration() != null && service.getConfiguration()
+ .getFiles().size() > 0) {
+ throw new YarnRuntimeException("The aux service:" + sName
+ + " has configured local classpath:" + appLocalClassPath
+ + " and config files:" + service.getConfiguration().getFiles()
+ + ". Only one of them should be configured.");
+ }
+
+ return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, className,
+ appLocalClassPath, getSystemClasses(service, className));
+ }
+
+ private AuxiliaryService createAuxService(org.apache.hadoop.yarn.service
+ .api.records.Service service, Configuration conf, boolean
+ fromConfiguration) throws IOException,
+ ClassNotFoundException {
+ final String sName = service.getName();
+ final String className = getClassName(service);
+ if (className == null || className.isEmpty()) {
+ throw new YarnRuntimeException("Class name not provided for auxiliary " +
+ "service " + sName);
+ }
+ if (fromConfiguration) {
+ final String appLocalClassPath = conf.get(String.format(
+ YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
+ if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
+ return createAuxServiceFromLocalClasspath(service, appLocalClassPath,
+ conf);
+ }
+ }
+ org.apache.hadoop.yarn.service.api.records.Configuration serviceConf =
+ service.getConfiguration();
+ List destFiles = new ArrayList<>();
+ if (serviceConf != null) {
+ List files = serviceConf.getFiles();
+ if (files != null) {
+ for (ConfigFile file : files) {
+ destFiles.add(maybeDownloadJars(sName, className, file.getSrcFile(),
+ file.getType(), conf));
+ }
+ }
+ }
+
+ if (destFiles.size() > 0) {
+ LOG.info("The aux service:" + sName
+ + " is using the custom classloader with classpath " + destFiles);
+ return AuxiliaryServiceWithCustomClassLoader.getInstance(conf,
+ className, StringUtils.join(File.pathSeparatorChar, destFiles),
+ getSystemClasses(service, className));
+ } else {
+ return createAuxServiceFromConfiguration(service);
+ }
+ }
+
+ private Path maybeDownloadJars(String sName, String className, String
+ appRemoteClassPath, ConfigFile.TypeEnum type, Configuration conf) throws
+ IOException {
+ // load AuxiliaryService from remote classpath
+ FileContext localLFS = getLocalFileContext(conf);
+ // create NM aux-service dir in NM localdir if it does not exist.
+ Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
+ + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
+ if (!localLFS.util().exists(nmAuxDir)) {
+ try {
+ localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
+ } catch (IOException ex) {
+ throw new YarnRuntimeException("Fail to create dir:"
+ + nmAuxDir.toString(), ex);
+ }
+ }
+ Path src = new Path(appRemoteClassPath);
+ FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
+ FileStatus scFileStatus = remoteLFS.getFileStatus(src);
+ if (!scFileStatus.getOwner().equals(
+ this.userUGI.getShortUserName())) {
+ throw new YarnRuntimeException("The remote jarfile owner:"
+ + scFileStatus.getOwner() + " is not the same as the NM user:"
+ + this.userUGI.getShortUserName() + ".");
+ }
+ if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
+ throw new YarnRuntimeException("The remote jarfile should not "
+ + "be writable by group or others. "
+ + "The current Permission is "
+ + scFileStatus.getPermission().toShort());
+ }
+ Path downloadDest = new Path(nmAuxDir,
+ className + "_" + scFileStatus.getModificationTime());
+ // check whether we need to re-download the jar
+ // from remote directory
+ Path targetDirPath = new Path(downloadDest,
+ scFileStatus.getPath().getName());
+ FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
+ for (FileStatus sub : allSubDirs) {
+ if (sub.getPath().getName().equals(downloadDest.getName())) {
+ return new Path(targetDirPath + Path.SEPARATOR + "*");
+ } else {
+ if (sub.getPath().getName().contains(className) &&
+ !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
+ Path delPath = new Path(sub.getPath().getParent(),
+ sub.getPath().getName() + DEL_SUFFIX);
+ localLFS.rename(sub.getPath(), delPath);
+ LOG.info("delete old aux service jar dir:"
+ + delPath.toString());
+ FileDeletionTask deletionTask = new FileDeletionTask(
+ this.delService, null, delPath, null);
+ this.delService.delete(deletionTask);
+ }
+ }
+ }
+ LocalResourceType srcType;
+ if (type == ConfigFile.TypeEnum.STATIC) {
+ srcType = LocalResourceType.FILE;
+ } else if (type == ConfigFile.TypeEnum.ARCHIVE) {
+ srcType = LocalResourceType.ARCHIVE;
+ } else {
+ throw new YarnRuntimeException(
+ "Cannot unpack file of type " + type + " from remote-file-path:" +
+ src + "for aux-service:" + ".\n");
+ }
+ LocalResource scRsrc = LocalResource.newInstance(
+ URL.fromURI(src.toUri()),
+ srcType, LocalResourceVisibility.PRIVATE,
+ scFileStatus.getLen(), scFileStatus.getModificationTime());
+ FSDownload download = new FSDownload(localLFS, null, conf,
+ downloadDest, scRsrc, null);
+ try {
+ // don't need to convert downloaded path into a dir
+ // since it's already a jar path.
+ return download.call();
+ } catch (Exception ex) {
+ throw new YarnRuntimeException(
+ "Exception happend while downloading files "
+ + "for aux-service:" + sName + " and remote-file-path:"
+ + src + ".\n" + ex.getMessage());
+ }
+ }
+
+ private void setStateStoreDir(String sName, AuxiliaryService s) throws
+ IOException {
+ if (stateStoreRoot != null) {
+ Path storePath = new Path(stateStoreRoot, sName);
+ stateStoreFs.mkdirs(storePath, storeDirPerms);
+ s.setRecoveryPath(storePath);
+ }
+ }
+
+ private void maybeRemoveAuxService(String sName) {
+ AuxiliaryService s = serviceMap.remove(sName);
+ serviceRecordMap.remove(sName);
+ serviceMetaData.remove(sName);
+ if (s != null) {
+ stopAuxService(s);
+ }
+ }
+
+ private AuxiliaryService initAuxService(org.apache.hadoop.yarn.service.api
+ .records.Service service, Configuration conf, boolean
+ fromConfiguration) throws IOException {
+ final String sName = service.getName();
+ AuxiliaryService s;
+ try {
+ Preconditions
+ .checkArgument(
+ validateAuxServiceName(sName),
+ "The auxiliary service name: " + sName + " is invalid. " +
+ "The valid service name should only contain a-zA-Z0-9_ " +
+ "and cannot start with numbers.");
+ maybeRemoveAuxService(sName);
+ s = createAuxService(service, conf, fromConfiguration);
+ if (s == null) {
+ throw new YarnRuntimeException("No auxiliary service class loaded for" +
+ " " + sName);
+ }
+ // TODO better use s.getName()?
+ if (!sName.equals(s.getName())) {
+ LOG.warn("The Auxiliary Service named '" + sName + "' in the "
+ + "configuration is for " + s.getClass() + " which has "
+ + "a name of '" + s.getName() + "'. Because these are "
+ + "not the same tools trying to send ServiceData and read "
+ + "Service Meta Data may have issues unless the refer to "
+ + "the name in the config.");
+ }
+ s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
+ addService(sName, s, service);
+ setStateStoreDir(sName, s);
+ Configuration customConf = new Configuration(conf);
+ if (service.getConfiguration() != null) {
+ for (Entry entry : service.getConfiguration()
+ .getProperties().entrySet()) {
+ customConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ s.init(customConf);
+
+ LOG.info("Initialized auxiliary service " + sName);
+ } catch (RuntimeException e) {
+ LOG.error("Failed to initialize " + sName, e);
+ throw e;
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(e);
+ }
+ return s;
+ }
+
+ /**
+ * Reloads auxiliary services manifest. Must be called after service init.
+ *
+ * @throws IOException if manifest can't be loaded
+ */
+ private void reloadManifest() throws IOException {
+ loadManifest(getConfig(), true);
+ }
+
+ private synchronized void loadManifest(Configuration conf, boolean
+ startServices) throws IOException {
+ if (manifest == null) {
+ return;
+ }
+ if (!manifestFS.exists(manifest)) {
+ LOG.info("Manifest file " + manifest + " doesn't exist");
+ return;
+ }
+ FileStatus status;
+ try {
+ status = manifestFS.getFileStatus(manifest);
+ } catch (FileNotFoundException e) {
+ LOG.info("Manifest file " + manifest + " doesn't exist");
+ return;
+ }
+ if (status.getModificationTime() == manifestModifyTS) {
+ return;
+ }
+ manifestModifyTS = status.getModificationTime();
+ LOG.info("Reading auxiliary services manifest " + manifest);
+ Set loadedAuxServices = new HashSet<>();
+ boolean foundChanges = false;
+ try (FSDataInputStream in = manifestFS.open(manifest)) {
+ Services services = mapper.readValue((InputStream) in, Services.class);
+ if (services.getServices() != null) {
+ for (org.apache.hadoop.yarn.service.api.records.Service service :
+ services.getServices()) {
+ org.apache.hadoop.yarn.service.api.records.Service existingService =
+ serviceRecordMap.get(service.getName());
+ loadedAuxServices.add(service.getName());
+ if (existingService != null && existingService.equals(service)) {
+ LOG.info("Auxiliary service already loaded: " + service.getName());
+ continue;
+ }
+ foundChanges = true;
+ try {
+ AuxiliaryService s = initAuxService(service, conf, false);
+ if (startServices) {
+ startAuxService(service.getName(), s, service);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to load auxiliary service " + service.getName());
+ }
+ }
+ }
+ }
+
+ synchronized (serviceMap) {
+ Iterator> it = serviceMap.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ Entry entry = it.next();
+ if (!loadedAuxServices.contains(entry.getKey())) {
+ foundChanges = true;
+ stopAuxService(entry.getValue());
+ it.remove();
+ }
+ }
+ }
+
+ if (!foundChanges) {
+ LOG.info("No auxiliary services changes detected in manifest");
+ }
+ }
+
+ private static String getClassName(org.apache.hadoop.yarn.service.api.records
+ .Service service) {
+ org.apache.hadoop.yarn.service.api.records.Configuration serviceConf =
+ service.getConfiguration();
+ if (serviceConf == null) {
+ return null;
+ }
+ return serviceConf.getProperty(CLASS_NAME);
+ }
+
+ private static String[] getSystemClasses(org.apache.hadoop.yarn.service.api
+ .records.Service service, String className) {
+ org.apache.hadoop.yarn.service.api.records.Configuration serviceConf =
+ service.getConfiguration();
+ if (serviceConf == null) {
+ return new String[]{className};
+ }
+ return StringUtils.split(serviceConf.getProperty(SYSTEM_CLASSES,
+ className));
+ }
+
+ private static org.apache.hadoop.yarn.service.api.records.Service
+ createServiceFromConfiguration(String sName, Configuration conf) {
+ String className = conf.get(String.format(
+ YarnConfiguration.NM_AUX_SERVICE_FMT, sName));
+ String remoteClassPath = conf.get(String.format(
+ YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
+ String[] systemClasses = conf.getTrimmedStrings(String.format(
+ YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, sName));
+
+ org.apache.hadoop.yarn.service.api.records.Configuration serviceConf =
+ new org.apache.hadoop.yarn.service.api.records.Configuration();
+ if (className != null) {
+ serviceConf.setProperty(CLASS_NAME, className);
+ }
+ if (systemClasses != null) {
+ serviceConf.setProperty(SYSTEM_CLASSES, StringUtils.join(",",
+ systemClasses));
+ }
+ if (remoteClassPath != null) {
+ ConfigFile.TypeEnum type;
+ String lcClassPath = StringUtils.toLowerCase(remoteClassPath);
+ if (lcClassPath.endsWith(".jar")) {
+ type = ConfigFile.TypeEnum.STATIC;
+ } else if (lcClassPath.endsWith(".zip") ||
+ lcClassPath.endsWith(".tar.gz") || lcClassPath.endsWith(".tgz") ||
+ lcClassPath.endsWith(".tar")) {
+ type = ConfigFile.TypeEnum.ARCHIVE;
+ } else {
+ throw new YarnRuntimeException("Cannot unpack file from " +
+ "remote-file-path:" + remoteClassPath + "for aux-service:" +
+ sName + ".\n");
+ }
+ ConfigFile file = new ConfigFile().srcFile(remoteClassPath).type(type);
+ serviceConf.getFiles().add(file);
+ }
+ return new org.apache.hadoop.yarn.service.api.records.Service().name(sName)
+ .configuration(serviceConf);
+ }
+
@Override
public void serviceInit(Configuration conf) throws Exception {
- final FsPermission storeDirPerms = new FsPermission((short)0700);
- Path stateStoreRoot = null;
- FileSystem stateStoreFs = null;
boolean recoveryEnabled = conf.getBoolean(
YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
@@ -138,171 +546,46 @@ public void serviceInit(Configuration conf) throws Exception {
STATE_STORE_ROOT_NAME);
stateStoreFs = FileSystem.getLocal(conf);
}
- Collection auxNames = conf.getStringCollection(
- YarnConfiguration.NM_AUX_SERVICES);
- for (final String sName : auxNames) {
- try {
- Preconditions
- .checkArgument(
- validateAuxServiceName(sName),
- "The ServiceName: " + sName + " set in " +
- YarnConfiguration.NM_AUX_SERVICES +" is invalid." +
- "The valid service name should only contain a-zA-Z0-9_ " +
- "and can not start with numbers");
- String classKey = String.format(
- YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
- String className = conf.get(classKey);
- final String appLocalClassPath = conf.get(String.format(
- YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
- final String appRemoteClassPath = conf.get(String.format(
- YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
- AuxiliaryService s = null;
- boolean useCustomerClassLoader = ((appLocalClassPath != null
- && !appLocalClassPath.isEmpty()) ||
- (appRemoteClassPath != null && !appRemoteClassPath.isEmpty()))
- && className != null && !className.isEmpty();
- if (useCustomerClassLoader) {
- // load AuxiliaryService from local class path
- if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) {
- s = AuxiliaryServiceWithCustomClassLoader.getInstance(
- conf, className, appLocalClassPath);
- } else {
- // load AuxiliaryService from remote class path
- if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
- throw new YarnRuntimeException("The aux serivce:" + sName
- + " has configured local classpath:" + appLocalClassPath
- + " and remote classpath:" + appRemoteClassPath
- + ". Only one of them should be configured.");
- }
- FileContext localLFS = getLocalFileContext(conf);
- // create NM aux-service dir in NM localdir if it does not exist.
- Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
- + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
- if (!localLFS.util().exists(nmAuxDir)) {
- try {
- localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
- } catch (IOException ex) {
- throw new YarnRuntimeException("Fail to create dir:"
- + nmAuxDir.toString(), ex);
- }
- }
- Path src = new Path(appRemoteClassPath);
- FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
- FileStatus scFileStatus = remoteLFS.getFileStatus(src);
- if (!scFileStatus.getOwner().equals(
- this.userUGI.getShortUserName())) {
- throw new YarnRuntimeException("The remote jarfile owner:"
- + scFileStatus.getOwner() + " is not the same as the NM user:"
- + this.userUGI.getShortUserName() + ".");
- }
- if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
- throw new YarnRuntimeException("The remote jarfile should not "
- + "be writable by group or others. "
- + "The current Permission is "
- + scFileStatus.getPermission().toShort());
- }
- Path dest = null;
- Path downloadDest = new Path(nmAuxDir,
- className + "_" + scFileStatus.getModificationTime());
- // check whether we need to re-download the jar
- // from remote directory
- Path targetDirPath = new Path(downloadDest,
- scFileStatus.getPath().getName());
- FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
- boolean reDownload = true;
- for (FileStatus sub : allSubDirs) {
- if (sub.getPath().getName().equals(downloadDest.getName())) {
- reDownload = false;
- dest = new Path(targetDirPath + Path.SEPARATOR + "*");
- break;
- } else {
- if (sub.getPath().getName().contains(className) &&
- !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
- Path delPath = new Path(sub.getPath().getParent(),
- sub.getPath().getName() + DEL_SUFFIX);
- localLFS.rename(sub.getPath(), delPath);
- LOG.info("delete old aux service jar dir:"
- + delPath.toString());
- FileDeletionTask deletionTask = new FileDeletionTask(
- this.delService, null, delPath, null);
- this.delService.delete(deletionTask);
- }
- }
- }
- if (reDownload) {
- LocalResourceType srcType = null;
- String lowerDst = StringUtils.toLowerCase(src.toString());
- if (lowerDst.endsWith(".jar")) {
- srcType = LocalResourceType.FILE;
- } else if (lowerDst.endsWith(".zip") ||
- lowerDst.endsWith(".tar.gz") || lowerDst.endsWith(".tgz")
- || lowerDst.endsWith(".tar")) {
- srcType = LocalResourceType.ARCHIVE;
- } else {
- throw new YarnRuntimeException(
- "Can not unpack file from remote-file-path:" + src
- + "for aux-service:" + ".\n");
- }
- LocalResource scRsrc = LocalResource.newInstance(
- URL.fromURI(src.toUri()),
- srcType, LocalResourceVisibility.PRIVATE,
- scFileStatus.getLen(), scFileStatus.getModificationTime());
- FSDownload download = new FSDownload(localLFS, null, conf,
- downloadDest, scRsrc, null);
- try {
- Path downloaded = download.call();
- // don't need to convert downloaded path into a dir
- // since its already a jar path.
- dest = downloaded;
- } catch (Exception ex) {
- throw new YarnRuntimeException(
- "Exception happend while downloading files "
- + "for aux-service:" + sName + " and remote-file-path:"
- + src + ".\n" + ex.getMessage());
- }
- }
- s = AuxiliaryServiceWithCustomClassLoader.getInstance(
- new Configuration(conf), className, dest.toString());
- }
- LOG.info("The aux service:" + sName
- + " are using the custom classloader");
- } else {
- Class extends AuxiliaryService> sClass = conf.getClass(
- classKey, null, AuxiliaryService.class);
-
- if (sClass == null) {
- throw new RuntimeException("No class defined for " + sName);
- }
- s = ReflectionUtils.newInstance(sClass, new Configuration(conf));
- }
- if (s == null) {
- throw new RuntimeException("No object created for " + sName);
- }
- // TODO better use s.getName()?
- if(!sName.equals(s.getName())) {
- LOG.warn("The Auxiliary Service named '"+sName+"' in the "
- +"configuration is for "+s.getClass()+" which has "
- +"a name of '"+s.getName()+"'. Because these are "
- +"not the same tools trying to send ServiceData and read "
- +"Service Meta Data may have issues unless the refer to "
- +"the name in the config.");
- }
- s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
- addService(sName, s);
- if (recoveryEnabled) {
- Path storePath = new Path(stateStoreRoot, sName);
- stateStoreFs.mkdirs(storePath, storeDirPerms);
- s.setRecoveryPath(storePath);
- }
- s.init(new Configuration(conf));
- } catch (RuntimeException e) {
- LOG.error("Failed to initialize " + sName, e);
- throw e;
+ String manifestStr = conf.get(YarnConfiguration.NM_AUX_SERVICES_MANIFEST);
+ if (manifestStr == null) {
+ Collection auxNames = conf.getStringCollection(
+ YarnConfiguration.NM_AUX_SERVICES);
+ for (final String sName : auxNames) {
+ org.apache.hadoop.yarn.service.api.records.Service service =
+ createServiceFromConfiguration(sName, conf);
+ initAuxService(service, conf, true);
}
+ } else {
+ manifest = new Path(manifestStr);
+ manifestFS = FileSystem.get(new URI(manifestStr), conf);
+ loadManifest(conf, false);
}
+ manifestReloadInterval = conf.getLong(
+ YarnConfiguration.NM_AUX_SERVICES_MANIFEST_RELOAD_MS,
+ YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS);
+ manifestReloadTask = new ManifestReloadTask();
+
super.serviceInit(conf);
}
+ private void startAuxService(String name, AuxiliaryService service, org
+ .apache.hadoop.yarn.service.api.records.Service serviceRecord) {
+ service.start();
+ service.registerServiceListener(this);
+ ByteBuffer meta = service.getMetaData();
+ if (meta != null) {
+ serviceMetaData.put(name, meta);
+ }
+ serviceRecord.setLaunchTime(new Date());
+ }
+
+ private void stopAuxService(Service service) {
+ if (service.getServiceState() == Service.STATE.STARTED) {
+ service.unregisterServiceListener(this);
+ service.stop();
+ }
+ }
+
@Override
public void serviceStart() throws Exception {
// TODO fork(?) services running as configured user
@@ -310,12 +593,13 @@ public void serviceStart() throws Exception {
for (Map.Entry entry : serviceMap.entrySet()) {
AuxiliaryService service = entry.getValue();
String name = entry.getKey();
- service.start();
- service.registerServiceListener(this);
- ByteBuffer meta = service.getMetaData();
- if(meta != null) {
- serviceMetaData.put(name, meta);
- }
+ startAuxService(name, service, serviceRecordMap.get(name));
+ }
+ if (manifest != null && manifestReloadInterval != -1) {
+ manifestReloadTimer = new Timer("AuxServicesManifestRelaod-Timer",
+ true);
+ manifestReloadTimer.scheduleAtFixedRate(manifestReloadTask,
+ manifestReloadInterval, manifestReloadInterval);
}
super.serviceStart();
}
@@ -325,14 +609,15 @@ public void serviceStop() throws Exception {
try {
synchronized (serviceMap) {
for (Service service : serviceMap.values()) {
- if (service.getServiceState() == Service.STATE.STARTED) {
- service.unregisterServiceListener(this);
- service.stop();
- }
+ stopAuxService(service);
}
serviceMap.clear();
+ serviceRecordMap.clear();
serviceMetaData.clear();
}
+ if (manifestReloadTimer != null) {
+ manifestReloadTimer.cancel();
+ }
} finally {
super.serviceStop();
}
@@ -340,9 +625,9 @@ public void serviceStop() throws Exception {
@Override
public void stateChanged(Service service) {
- LOG.error("Service " + service.getName() + " changed state: " +
+ // services changing state is expected on reload
+ LOG.info("Service " + service.getName() + " changed state: " +
service.getServiceState());
- stop();
}
@Override
@@ -448,4 +733,41 @@ private UserGroupInformation getRemoteUgi() {
}
return remoteUgi;
}
+
+ protected static org.apache.hadoop.yarn.service.api.records.Service
+ newAuxService(String name, String className) {
+ org.apache.hadoop.yarn.service.api.records.Configuration serviceConf =
+ new org.apache.hadoop.yarn.service.api.records.Configuration();
+ serviceConf.setProperty(CLASS_NAME, className);
+ return new org.apache.hadoop.yarn.service.api.records.Service()
+ .name(name)
+ .configuration(serviceConf);
+ }
+
+ protected static void setClasspath(org.apache.hadoop.yarn.service.api
+ .records.Service service, String classpath) {
+ service.getConfiguration().getFiles().add(
+ new ConfigFile().srcFile(classpath).type(ConfigFile.TypeEnum.STATIC));
+ }
+
+ protected static void setSystemClasses(org.apache.hadoop.yarn.service.api
+ .records.Service service, String systemClasses) {
+ service.getConfiguration().setProperty(SYSTEM_CLASSES, systemClasses);
+ }
+
+ /**
+ * Class which is used by the {@link Timer} class to periodically execute the
+ * manifest reload.
+ */
+ private final class ManifestReloadTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ reloadManifest();
+ } catch (Throwable t) {
+ // Prevent uncaught exceptions from killing this thread
+ LOG.warn("Error while reloading manifest: ", t);
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java
index c764fbdc065..857c1b4b3b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java
@@ -159,11 +159,8 @@ public void setRecoveryPath(Path recoveryPath) {
}
public static AuxiliaryServiceWithCustomClassLoader getInstance(
- Configuration conf, String className, String appClassPath)
- throws IOException, ClassNotFoundException {
- String[] systemClasses = conf.getTrimmedStrings(String.format(
- YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
- className));
+ Configuration conf, String className, String appClassPath, String[]
+ systemClasses) throws IOException, ClassNotFoundException {
ClassLoader customClassLoader = createAuxServiceClassLoader(
appClassPath, systemClasses);
Class> clazz = Class.forName(className, true,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8a12c3c5105..42ea59c8313 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -258,6 +258,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,
this.context, this.deletionService);
auxiliaryServices.registerServiceListener(this);
+ context.setAuxServices(auxiliaryServices);
addService(auxiliaryServices);
// initialize the metrics publisher if the timeline service v.2 is enabled
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index ca08897eeec..dd774a36b64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -24,6 +24,7 @@
import java.nio.charset.Charset;
import java.security.Principal;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
@@ -31,7 +32,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AuxiliaryServiceInfo;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AuxiliaryServicesInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
+import org.apache.hadoop.yarn.service.api.records.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -556,6 +560,25 @@ public Object getNMResourceInfo(
return new NMResourceInfo();
}
+ @GET
+ @Path("/auxiliaryservices")
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ public AuxiliaryServicesInfo getAuxiliaryServices(@javax.ws.rs.core.Context
+ HttpServletRequest hsr) {
+ init();
+ AuxiliaryServicesInfo auxiliaryServices = new AuxiliaryServicesInfo();
+ if (!hasAdminAccess(hsr)) {
+ return auxiliaryServices;
+ }
+ Collection loadedServices = nmContext.getAuxServices()
+ .getServiceRecords();
+ if (loadedServices != null) {
+ auxiliaryServices.addAll(loadedServices);
+ }
+ return auxiliaryServices;
+ }
+
@PUT
@Path("/yarn/sysfs/{user}/{appId}")
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
@@ -625,6 +648,21 @@ protected Boolean hasAccess(String user, ApplicationId appId,
return true;
}
+ protected Boolean hasAdminAccess(HttpServletRequest hsr) {
+ // Check for the authorization.
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+
+ if (callerUGI == null) {
+ return false;
+ }
+
+ if (!this.nmContext.getApplicationACLsManager().isAdmin(callerUGI)) {
+ return false;
+ }
+
+ return true;
+ }
+
private UserGroupInformation getCallerUserGroupInformation(
HttpServletRequest hsr, boolean usePrincipal) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java
new file mode 100644
index 00000000000..60c94aca183
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+
+@XmlRootElement(name = "service")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AuxiliaryServiceInfo {
+ protected String name;
+ protected String version;
+ protected Date startTime;
+
+ public AuxiliaryServiceInfo() {
+ // JAXB needs this
+ }
+
+ public AuxiliaryServiceInfo(String name, String version, Date startTime) {
+ this.name = name;
+ this.version = version;
+ this.startTime = startTime;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java
new file mode 100644
index 00000000000..2a71fc7bd65
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.webapp.dao;
+
+import org.apache.hadoop.yarn.service.api.records.Service;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Collection;
+
+@XmlRootElement(name = "services")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AuxiliaryServicesInfo {
+ protected ArrayList services = new
+ ArrayList<>();
+
+ public AuxiliaryServicesInfo() {
+ // JAXB needs this
+ }
+
+ public void add(Service s) {
+ services.add(new AuxiliaryServiceInfo(s.getName(), s.getVersion(), s
+ .getLaunchTime()));
+ }
+
+ public void addAll(Collection serviceList) {
+ for (Service service : serviceList) {
+ add(service);
+ }
+ }
+
+ public ArrayList getServices() {
+ return services;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 2794857c6e7..2ecbf316eba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -827,5 +828,14 @@ public DeletionService getDeletionService() {
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
return null;
}
+
+ @Override
+ public void setAuxServices(AuxServices auxServices) {
+ }
+
+ @Override
+ public AuxServices getAuxServices() {
+ return null;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
index ca0b32a7521..55d6936e041 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
@@ -31,6 +31,15 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.hadoop.yarn.service.api.records.Services;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +96,7 @@
import org.junit.Assert;
import org.junit.Test;
+@RunWith(value = Parameterized.class)
public class TestAuxServices {
private static final Logger LOG =
LoggerFactory.getLogger(TestAuxServices.class);
@@ -97,8 +107,39 @@
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
mock(AuxiliaryLocalPathHandler.class);
private final static Context MOCK_CONTEXT = mock(Context.class);
- private final static DeletionService MOCK_DEL_SERVICE = mock(
+ private final DeletionService MOCK_DEL_SERVICE = mock(
DeletionService.class);
+ private final Boolean useManifest;
+ File rootDir = GenericTestUtils.getTestDir(getClass()
+ .getSimpleName());
+ File manifest = new File(rootDir, "manifest.txt");
+ ObjectMapper mapper = new ObjectMapper();
+
+ @Parameterized.Parameters
+ public static Collection getParams() {
+ return Arrays.asList(false, true);
+ }
+
+ @Before
+ public void setup() {
+ if (!rootDir.exists()) {
+ rootDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() {
+ if (useManifest) {
+ manifest.delete();
+ }
+ rootDir.delete();
+ }
+
+ public TestAuxServices(Boolean useManifest) {
+ this.useManifest = useManifest;
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ mapper.configure(SerializationFeature.WRITE_NULL_MAP_VALUES, false);
+ }
static class LightService extends AuxiliaryService implements Service
{
@@ -204,15 +245,27 @@ public ByteBuffer getMetaData() {
}
}
+ private void writeManifestFile(Services services, Configuration
+ conf) throws IOException {
+ conf.set(YarnConfiguration.NM_AUX_SERVICES_MANIFEST, manifest
+ .getAbsolutePath());
+ mapper.writeValue(manifest, services);
+ }
+
@SuppressWarnings("resource")
@Test
public void testRemoteAuxServiceClassPath() throws Exception {
Configuration conf = new YarnConfiguration();
FileSystem fs = FileSystem.get(conf);
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] {"ServiceC"});
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
- "ServiceC"), ServiceC.class, Service.class);
+ org.apache.hadoop.yarn.service.api.records.Service serviceC = AuxServices
+ .newAuxService("ServiceC", ServiceC.class.getName());
+ Services services = new Services().serviceList(serviceC);
+ if (!useManifest) {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[]{"ServiceC"});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ "ServiceC"), ServiceC.class, Service.class);
+ }
Context mockContext2 = mock(Context.class);
LocalDirsHandlerService mockDirsHandler = mock(
@@ -223,11 +276,6 @@ public void testRemoteAuxServiceClassPath() throws Exception {
rootAuxServiceDirPath);
when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
- File rootDir = GenericTestUtils.getTestDir(getClass()
- .getSimpleName());
- if (!rootDir.exists()) {
- rootDir.mkdirs();
- }
AuxServices aux = null;
File testJar = null;
try {
@@ -243,9 +291,14 @@ public void testRemoteAuxServiceClassPath() throws Exception {
perms.add(PosixFilePermission.GROUP_WRITE);
Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()),
perms);
- conf.set(String.format(
- YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
- testJar.getAbsolutePath());
+ if (useManifest) {
+ AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
+ writeManifestFile(services, conf);
+ } else {
+ conf.set(String.format(
+ YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
+ testJar.getAbsolutePath());
+ }
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -260,9 +313,14 @@ public void testRemoteAuxServiceClassPath() throws Exception {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName());
- conf.set(String.format(
- YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
- testJar.getAbsolutePath());
+ if (useManifest) {
+ AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
+ writeManifestFile(services, conf);
+ } else {
+ conf.set(String.format(
+ YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
+ testJar.getAbsolutePath());
+ }
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -301,10 +359,6 @@ public void testRemoteAuxServiceClassPath() throws Exception {
FileTime fileTime = FileTime.fromMillis(time);
Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()),
fileTime);
- conf.set(
- String.format(YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH,
- "ServiceC"),
- testJar.getAbsolutePath());
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -316,7 +370,6 @@ public void testRemoteAuxServiceClassPath() throws Exception {
} finally {
if (testJar != null) {
testJar.delete();
- rootDir.delete();
}
if (fs.exists(new Path(root))) {
fs.delete(new Path(root), true);
@@ -333,10 +386,17 @@ public void testRemoteAuxServiceClassPath() throws Exception {
public void testCustomizedAuxServiceClassPath() throws Exception {
// verify that we can load AuxService Class from default Class path
Configuration conf = new YarnConfiguration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] {"ServiceC"});
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
- "ServiceC"), ServiceC.class, Service.class);
+ org.apache.hadoop.yarn.service.api.records.Service serviceC = AuxServices
+ .newAuxService("ServiceC", ServiceC.class.getName());
+ Services services = new Services().serviceList(serviceC);
+ if (useManifest) {
+ writeManifestFile(services, conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[]{"ServiceC"});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ "ServiceC"), ServiceC.class, Service.class);
+ }
@SuppressWarnings("resource")
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
@@ -358,31 +418,41 @@ public void testCustomizedAuxServiceClassPath() throws Exception {
// create a new jar file, and configure it as customized class path
// for this AuxService, and make sure that we could load the class
// from this configured customized class path
- File rootDir = GenericTestUtils.getTestDir(getClass()
- .getSimpleName());
- if (!rootDir.exists()) {
- rootDir.mkdirs();
- }
File testJar = null;
try {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
- "test-runjar.jar", 2048, ServiceC.class.getName());
+ "test-runjar.jar", 2048, ServiceC.class.getName(), LightService
+ .class.getName());
conf = new YarnConfiguration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] {"ServiceC"});
- conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"),
- ServiceC.class.getName());
- conf.set(String.format(
- YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"),
- testJar.getAbsolutePath());
// remove "-org.apache.hadoop." from system classes
String systemClasses = "-org.apache.hadoop." + "," +
- ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
- conf.set(String.format(
- YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
- "ServiceC"), systemClasses);
+ ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
+ if (useManifest) {
+ AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
+ AuxServices.setSystemClasses(serviceC, systemClasses);
+ writeManifestFile(services, conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[]{"ServiceC"});
+ conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"),
+ ServiceC.class.getName());
+ conf.set(String.format(
+ YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"),
+ testJar.getAbsolutePath());
+ conf.set(String.format(
+ YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
+ "ServiceC"), systemClasses);
+ }
+ Context mockContext2 = mock(Context.class);
+ LocalDirsHandlerService mockDirsHandler = mock(
+ LocalDirsHandlerService.class);
+ String root = "target/LocalDir";
+ Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
+ when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
+ rootAuxServiceDirPath);
+ when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
- MOCK_CONTEXT, MOCK_DEL_SERVICE);
+ mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
@@ -405,21 +475,31 @@ public void testCustomizedAuxServiceClassPath() throws Exception {
} finally {
if (testJar != null) {
testJar.delete();
- rootDir.delete();
}
}
}
@Test
- public void testAuxEventDispatch() {
+ public void testAuxEventDispatch() throws IOException {
Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
- ServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
- ServiceB.class, Service.class);
- conf.setInt("A.expected.init", 1);
- conf.setInt("B.expected.stop", 1);
+ if (useManifest) {
+ org.apache.hadoop.yarn.service.api.records.Service serviceA =
+ AuxServices.newAuxService("Asrv", ServiceA.class.getName());
+ serviceA.getConfiguration().setProperty("A.expected.init", "1");
+ org.apache.hadoop.yarn.service.api.records.Service serviceB =
+ AuxServices.newAuxService("Bsrv", ServiceB.class.getName());
+ serviceB.getConfiguration().setProperty("B.expected.stop", "1");
+ writeManifestFile(new Services().serviceList(serviceA, serviceB), conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{"Asrv",
+ "Bsrv"});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
+ ServiceA.class, Service.class);
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
+ ServiceB.class, Service.class);
+ conf.setInt("A.expected.init", 1);
+ conf.setInt("B.expected.stop", 1);
+ }
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -477,14 +557,35 @@ public void testAuxEventDispatch() {
}
}
- @Test
- public void testAuxServices() {
+ private Configuration getABConf() throws
+ IOException {
+ return getABConf("Asrv", "Bsrv", ServiceA.class, ServiceB.class);
+ }
+
+ private Configuration getABConf(String aName, String bName,
+ Class extends Service> aClass, Class extends Service> bClass) throws
+ IOException {
Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
- ServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
- ServiceB.class, Service.class);
+ if (useManifest) {
+ org.apache.hadoop.yarn.service.api.records.Service serviceA =
+ AuxServices.newAuxService(aName, aClass.getName());
+ org.apache.hadoop.yarn.service.api.records.Service serviceB =
+ AuxServices.newAuxService(bName, bClass.getName());
+ writeManifestFile(new Services().serviceList(serviceA, serviceB), conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{aName,
+ bName});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, aName),
+ aClass, Service.class);
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, bName),
+ bClass, Service.class);
+ }
+ return conf;
+ }
+
+ @Test
+ public void testAuxServices() throws IOException {
+ Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -510,15 +611,9 @@ public void testAuxServices() {
}
}
-
@Test
- public void testAuxServicesMeta() {
- Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
- ServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
- ServiceB.class, Service.class);
+ public void testAuxServicesMeta() throws IOException {
+ Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -547,16 +642,10 @@ public void testAuxServicesMeta() {
}
}
-
-
@Test
- public void testAuxUnexpectedStop() {
- Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
- ServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
- ServiceB.class, Service.class);
+ public void testAuxUnexpectedStop() throws IOException {
+ // AuxServices no longer expected to stop when services stop
+ Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
@@ -564,21 +653,17 @@ public void testAuxUnexpectedStop() {
Service s = aux.getServices().iterator().next();
s.stop();
- assertEquals("Auxiliary service stopped, but AuxService unaffected.",
- STOPPED, aux.getServiceState());
- assertTrue(aux.getServices().isEmpty());
+ assertEquals("Auxiliary service stop caused AuxServices stop",
+ STARTED, aux.getServiceState());
+ assertEquals(2, aux.getServices().size());
}
@Test
- public void testValidAuxServiceName() {
+ public void testValidAuxServiceName() throws IOException {
+ Configuration conf = getABConf("Asrv1", "Bsrv_2", ServiceA.class,
+ ServiceB.class);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
- Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
- ServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv_2"),
- ServiceB.class, Service.class);
try {
aux.init(conf);
} catch (Exception ex) {
@@ -588,30 +673,33 @@ public void testValidAuxServiceName() {
//Test bad auxService Name
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
- ServiceA.class, Service.class);
+ if (useManifest) {
+ org.apache.hadoop.yarn.service.api.records.Service serviceA =
+ AuxServices.newAuxService("1Asrv1", ServiceA.class.getName());
+ writeManifestFile(new Services().serviceList(serviceA), conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]
+ {"1Asrv1"});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ "1Asrv1"), ServiceA.class, Service.class);
+ }
try {
aux1.init(conf);
Assert.fail("Should receive the exception.");
} catch (Exception ex) {
- assertTrue(ex.getMessage().contains("The ServiceName: 1Asrv1 set in " +
- "yarn.nodemanager.aux-services is invalid.The valid service name " +
- "should only contain a-zA-Z0-9_ and can not start with numbers"));
+ assertTrue("Wrong message: " + ex.getMessage(),
+ ex.getMessage().contains("The auxiliary service name: 1Asrv1 is " +
+ "invalid. The valid service name should only contain a-zA-Z0-9_" +
+ " and cannot start with numbers."));
}
}
@Test
public void testAuxServiceRecoverySetup() throws IOException {
- Configuration conf = new YarnConfiguration();
+ Configuration conf = getABConf("Asrv", "Bsrv", RecoverableServiceA.class,
+ RecoverableServiceB.class);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TEST_DIR.toString());
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] { "Asrv", "Bsrv" });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
- RecoverableServiceA.class, Service.class);
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
- RecoverableServiceB.class, Service.class);
try {
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
@@ -708,20 +796,33 @@ public ByteBuffer getMetaData() {
}
@Test
- public void testAuxServicesConfChange() {
+ public void testAuxServicesConfChange() throws IOException {
Configuration conf = new Configuration();
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[]{"ConfChangeAuxService"});
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
- "ConfChangeAuxService"), ConfChangeAuxService.class, Service.class);
+ if (useManifest) {
+ org.apache.hadoop.yarn.service.api.records.Service service =
+ AuxServices.newAuxService("ConfChangeAuxService",
+ ConfChangeAuxService.class.getName());
+ service.getConfiguration().setProperty("dummyConfig", "testValue");
+ writeManifestFile(new Services().serviceList(service), conf);
+ } else {
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[]{"ConfChangeAuxService"});
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ "ConfChangeAuxService"), ConfChangeAuxService.class, Service.class);
+ conf.set("dummyConfig", "testValue");
+ }
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT,
MOCK_DEL_SERVICE);
- conf.set("dummyConfig", "testValue");
aux.init(conf);
aux.start();
for (AuxiliaryService s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
- assertEquals(conf.get("dummyConfig"), "testValue");
+ if (useManifest) {
+ assertNull(conf.get("dummyConfig"));
+ } else {
+ assertEquals("testValue", conf.get("dummyConfig"));
+ }
+ assertEquals("changedTestValue", s.getConfig().get("dummyConfig"));
}
aux.stop();