diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep new file mode 100644 index 0000000..e69de29 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index b1efa5f..26a33b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -53,6 +53,11 @@ hadoop-yarn-api + org.apache.hadoop + hadoop-yarn-server-timelineservice + ${project.version} + + javax.xml.bind jaxb-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index ca2f239..eae8889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -22,7 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -30,6 +29,9 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; @@ -112,6 +114,12 @@ public void setup() { bind(NMWebServices.class); bind(GenericExceptionHandler.class); bind(JAXBContextResolver.class); + // host the timeline service aggregator web service temporarily + // (see YARN-3087) + bind(PerNodeAggregatorWebService.class); + // bind to the global singleton instance + bind(AppLevelServiceManager.class). + toProvider(AppLevelServiceManagerProvider.class); bind(ResourceView.class).toInstance(this.resourceView); bind(ApplicationACLsManager.class).toInstance(this.aclsManager); bind(LocalDirsHandlerService.class).toInstance(dirsHandler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index b3fbc3e..3154ca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -43,6 +43,11 @@ org.apache.hadoop + hadoop-annotations + + + + org.apache.hadoop hadoop-yarn-api @@ -53,12 +58,57 @@ org.apache.hadoop - hadoop-yarn-server-common + hadoop-yarn-server-applicationhistoryservice + + + + com.google.guava + guava + com.google.inject + guice + + + + javax.servlet + servlet-api + + + + javax.xml.bind + jaxb-api + + + + com.sun.jersey + jersey-core + + + + commons-logging + commons-logging + + + + org.apache.hadoop - hadoop-yarn-server-applicationhistoryservice + hadoop-common + test-jar + test + + + + junit + junit + test + + + + org.mockito + mockito-all + test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java deleted file mode 100644 index 955aad5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice; - -public class TimelineAggregator { - -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java new file mode 100644 index 0000000..bf72fb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelAggregatorService extends BaseAggregatorService { + private final String applicationId; + // TODO define key metadata such as flow metadata, user, and queue + + public AppLevelAggregatorService(String applicationId) { + super(AppLevelAggregatorService.class.getName() + " - " + applicationId); + this.applicationId = applicationId; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java new file mode 100644 index 0000000..05d321f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * Class that manages adding and removing app level aggregator services and + * their lifecycle. It provides thread safety access to the app level services. + * + * It is a singleton, and instances should be obtained via + * {@link #getInstance()}. + */ +@Private +@Unstable +public class AppLevelServiceManager extends CompositeService { + private static final Log LOG = + LogFactory.getLog(AppLevelServiceManager.class); + private static final AppLevelServiceManager INSTANCE = + new AppLevelServiceManager(); + + // access to this map is synchronized with the map itself + private final Map services = + Collections.synchronizedMap( + new HashMap()); + + static AppLevelServiceManager getInstance() { + return INSTANCE; + } + + AppLevelServiceManager() { + super(AppLevelServiceManager.class.getName()); + } + + /** + * Creates and adds an app level aggregator service for the specified + * application id. The service is also initialized and started. If the service + * already exists, no new service is created. + * + * @throws YarnRuntimeException if there was any exception in initializing and + * starting the app level service + * @return whether it was added successfully + */ + public boolean addService(String appId) { + synchronized (services) { + AppLevelAggregatorService service = services.get(appId); + if (service == null) { + try { + service = new AppLevelAggregatorService(appId); + // initialize, start, and add it to the parent service so it can be + // cleaned up when the parent shuts down + service.init(getConfig()); + service.start(); + services.put(appId, service); + LOG.info("the application aggregator service for " + appId + + " was added"); + return true; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + String msg = "the application aggregator service for " + appId + + " already exists!"; + LOG.error(msg); + return false; + } + } + } + + /** + * Removes the app level aggregator service for the specified application id. + * The service is also stopped as a result. If the service does not exist, no + * change is made. + * + * @return whether it was removed successfully + */ + public boolean removeService(String appId) { + synchronized (services) { + AppLevelAggregatorService service = services.remove(appId); + if (service == null) { + String msg = "the application aggregator service for " + appId + + " does not exist!"; + LOG.error(msg); + return false; + } else { + // stop the service to do clean up + service.stop(); + LOG.info("the application aggregator service for " + appId + + " was removed"); + return true; + } + } + } + + /** + * Returns the app level aggregator service for the specified application id. + * + * @return the app level aggregator service or null if it does not exist + */ + public AppLevelAggregatorService getService(String appId) { + return services.get(appId); + } + + /** + * Returns whether the app level aggregator service for the specified + * application id exists. + */ + public boolean hasService(String appId) { + return services.containsKey(appId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java new file mode 100644 index 0000000..8768575 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import com.google.inject.Provider; + +/** + * A guice provider that provides a global singleton instance of + * AppLevelServiceManager. + */ +public class AppLevelServiceManagerProvider + implements Provider { + @Override + public AppLevelServiceManager get() { + return AppLevelServiceManager.getInstance(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java new file mode 100644 index 0000000..994c66f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage. + * + * Classes that extend this can add their own lifecycle management or + * customization of request handling. + */ +@Private +@Unstable +public class BaseAggregatorService extends CompositeService { + private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class); + + public BaseAggregatorService(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + /** + * Handles entity writes. These writes are synchronous and are written to the + * backing storage without buffering/batching. If any entity already exists, + * it results in an update of the entity. + * + * This method should be reserved for selected critical entities and events. + * For normal voluminous writes one should use the async method + * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * + * @param entities entities to post + * @param callerUgi the caller UGI + * @return the response that contains the result of the post. + */ + public TimelinePutResponse postEntities(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + return null; + } + + /** + * Handles entity writes in an asynchronous manner. The method returns as soon + * as validation is done. No promises are made on how quickly it will be + * written to the backing storage or if it will always be written to the + * backing storage. Multiple writes to the same entities may be batched and + * appropriate values updated and result in fewer writes to the backing + * storage. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void postEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java new file mode 100644 index 0000000..6371e82 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerContext; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The top-level server for the per-node timeline aggregator service. Currently + * it is defined as an auxiliary service to accommodate running within another + * daemon (e.g. node manager). + */ +@Private +@Unstable +public class PerNodeAggregatorServer extends AuxiliaryService { + private static final Log LOG = + LogFactory.getLog(PerNodeAggregatorServer.class); + private static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private final AppLevelServiceManager serviceManager; + private WebApp webApp; + + public PerNodeAggregatorServer() { + // use the same singleton + this(AppLevelServiceManager.getInstance()); + } + + @VisibleForTesting + PerNodeAggregatorServer(AppLevelServiceManager serviceManager) { + super("timeline_aggregator"); + this.serviceManager = serviceManager; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serviceManager.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + serviceManager.start(); + startWebApp(); + } + + @Override + protected void serviceStop() throws Exception { + if (webApp != null) { + webApp.stop(); + } + // stop the service manager + serviceManager.stop(); + super.serviceStop(); + } + + private void startWebApp() { + Configuration conf = getConfig(); + // use the same ports as the old ATS for now; we could create new properties + // for the new timeline service if needed + String bindAddress = WebAppUtils.getWebAppBindURL(conf, + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); + LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); + try { + webApp = + WebApps + .$for("timeline", null, null, "ws") + .with(conf).at(bindAddress).start( + new TimelineServiceWebApp()); + } catch (Exception e) { + String msg = "The per-node aggregator webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + + private static class TimelineServiceWebApp + extends WebApp implements YarnWebParams { + @Override + public void setup() { + bind(PerNodeAggregatorWebService.class); + // bind to the global singleton + bind(AppLevelServiceManager.class). + toProvider(AppLevelServiceManagerProvider.class); + } + } + + // these methods can be used as the basis for future service methods if the + // per-node aggregator runs separate from the node manager + /** + * Creates and adds an app level aggregator service for the specified + * application id. The service is also initialized and started. If the service + * already exists, no new service is created. + * + * @return whether it was added successfully + */ + public boolean addApplication(ApplicationId appId) { + String appIdString = appId.toString(); + return serviceManager.addService(appIdString); + } + + /** + * Removes the app level aggregator service for the specified application id. + * The service is also stopped as a result. If the service does not exist, no + * change is made. + * + * @return whether it was removed successfully + */ + public boolean removeApplication(ApplicationId appId) { + String appIdString = appId.toString(); + return serviceManager.removeService(appIdString); + } + + /** + * Creates and adds an app level aggregator service for the specified + * application id. The service is also initialized and started. If the service + * already exists, no new service is created. + */ + @Override + public void initializeContainer(ContainerInitializationContext context) { + // intercept the event of the AM container being created and initialize the + // app level aggregator service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + addApplication(appId); + } + } + + /** + * Removes the app level aggregator service for the specified application id. + * The service is also stopped as a result. If the service does not exist, no + * change is made. + */ + @Override + public void stopContainer(ContainerTerminationContext context) { + // intercept the event of the AM container being stopped and remove the app + // level aggregator service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + removeApplication(appId); + } + } + + private boolean isApplicationMaster(ContainerContext context) { + // TODO this is based on a (shaky) assumption that the container id (the + // last field of the full container id) for an AM is always 1 + // we want to make this much more reliable + ContainerId containerId = context.getContainerId(); + return containerId.getContainerId() == 1L; + } + + @VisibleForTesting + boolean hasApplication(String appId) { + return serviceManager.hasService(appId); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + + @Override + public ByteBuffer getMetaData() { + // TODO currently it is not used; we can return a more meaningful data when + // we connect it with an AM + return ByteBuffer.allocate(0); + } + + @VisibleForTesting + static PerNodeAggregatorServer launchServer(String[] args) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args, + LOG); + PerNodeAggregatorServer server = null; + try { + server = new PerNodeAggregatorServer(); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server), + SHUTDOWN_HOOK_PRIORITY); + YarnConfiguration conf = new YarnConfiguration(); + server.init(conf); + server.start(); + } catch (Throwable t) { + LOG.fatal("Error starting PerNodeAggregatorServer", t); + ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); + } + return server; + } + + private static class ShutdownHook implements Runnable { + private final PerNodeAggregatorServer server; + + public ShutdownHook(PerNodeAggregatorServer server) { + this.server = server; + } + + public void run() { + server.stop(); + } + } + + public static void main(String[] args) { + launchServer(args); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java new file mode 100644 index 0000000..2d96699 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * The main per-node REST end point for timeline service writes. It is + * essentially a container service that routes requests to the appropriate + * per-app services. + */ +@Private +@Unstable +@Singleton +@Path("/ws/v2/timeline") +public class PerNodeAggregatorWebService { + private static final Log LOG = + LogFactory.getLog(PerNodeAggregatorWebService.class); + + private final AppLevelServiceManager serviceManager; + + @Inject + public PerNodeAggregatorWebService(AppLevelServiceManager serviceManager) { + this.serviceManager = serviceManager; + } + + @XmlRootElement(name = "about") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class AboutInfo { + + private String about; + + public AboutInfo() { + + } + + public AboutInfo(String about) { + this.about = about; + } + + @XmlElement(name = "About") + public String getAbout() { + return about; + } + + public void setAbout(String about) { + this.about = about; + } + + } + + /** + * Return the description of the timeline web services. + */ + @GET + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public AboutInfo about( + @Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return new AboutInfo("Timeline API"); + } + + /** + * Accepts writes to the aggregator, and returns a response. It simply routes + * the request to the app level aggregator. It expects an application as a + * context. + */ + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + TimelineEntities entities) { + init(res); + UserGroupInformation callerUgi = getUser(req); + if (callerUgi == null) { + String msg = "The owner of the posted timeline entities is not set"; + LOG.error(msg); + throw new ForbiddenException(msg); + } + + // TODO how to express async posts and handle them + try { + AppLevelAggregatorService service = getAggregatorService(req); + if (service == null) { + LOG.error("Application not found"); + throw new NotFoundException(); // different exception? + } + return service.postEntities(entities, callerUgi); + } catch (Exception e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private AppLevelAggregatorService + getAggregatorService(HttpServletRequest req) { + String appIdString = getApplicationId(req); + return serviceManager.getService(appIdString); + } + + private String getApplicationId(HttpServletRequest req) { + // TODO the application id from the request + // (most likely from the URI) + return null; + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUgi = null; + if (remoteUser != null) { + callerUgi = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUgi; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java deleted file mode 100644 index 7e0b775..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice; - -public class TestTimelineAggregator { - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java new file mode 100644 index 0000000..c0af8c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + + +public class TestAppLevelAggregatorService { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java new file mode 100644 index 0000000..3f981c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestAppLevelServiceManager { + + @Test(timeout=60000) + public void testMultithreadedAdd() throws Exception { + final AppLevelServiceManager serviceManager = + spy(new AppLevelServiceManager()); + doReturn(new Configuration()).when(serviceManager).getConfig(); + + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable task = new Callable() { + public Boolean call() { + return serviceManager.addService(appId); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertTrue(serviceManager.hasService(String.valueOf(i))); + } + } + + @Test + public void testMultithreadedAddAndRemove() throws Exception { + final AppLevelServiceManager serviceManager = + spy(new AppLevelServiceManager()); + doReturn(new Configuration()).when(serviceManager).getConfig(); + + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable task = new Callable() { + public Boolean call() { + return serviceManager.addService(appId) && + serviceManager.removeService(appId); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertFalse(serviceManager.hasService(String.valueOf(i))); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java new file mode 100644 index 0000000..55953cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +public class TestBaseAggregatorService { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java new file mode 100644 index 0000000..902047d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.junit.Test; + +public class TestPerNodeAggregatorServer { + private ApplicationAttemptId appAttemptId; + + public TestPerNodeAggregatorServer() { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + } + + @Test + public void testAddApplication() throws Exception { + PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); + // aggregator should have a single app + assertTrue(aggregator.hasApplication( + appAttemptId.getApplicationId().toString())); + aggregator.close(); + } + + @Test + public void testAddApplicationNonAMContainer() throws Exception { + PerNodeAggregatorServer aggregator = createAggregator(); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + aggregator.initializeContainer(context); + // aggregator should not have that app + assertFalse(aggregator.hasApplication( + appAttemptId.getApplicationId().toString())); + } + + @Test + public void testRemoveApplication() throws Exception { + PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); + // aggregator should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(aggregator.hasApplication(appIdStr)); + + ContainerId containerId = getAMContainerId(); + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + aggregator.stopContainer(context); + // aggregator should not have that app + assertFalse(aggregator.hasApplication(appIdStr)); + aggregator.close(); + } + + @Test + public void testRemoveApplicationNonAMContainer() throws Exception { + PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); + // aggregator should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(aggregator.hasApplication(appIdStr)); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + aggregator.stopContainer(context); + // aggregator should still have that app + assertTrue(aggregator.hasApplication(appIdStr)); + aggregator.close(); + } + + @Test(timeout = 60000) + public void testLaunch() throws Exception { + ExitUtil.disableSystemExit(); + PerNodeAggregatorServer server = null; + try { + server = + PerNodeAggregatorServer.launchServer(new String[0]); + } catch (ExitUtil.ExitException e) { + assertEquals(0, e.status); + ExitUtil.resetFirstExitException(); + fail(); + } finally { + if (server != null) { + server.stop(); + } + } + } + + private PerNodeAggregatorServer createAggregatorAndAddApplication() { + PerNodeAggregatorServer aggregator = createAggregator(); + // create an AM container + ContainerId containerId = getAMContainerId(); + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + aggregator.initializeContainer(context); + return aggregator; + } + + private PerNodeAggregatorServer createAggregator() { + AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager()); + doReturn(new Configuration()).when(serviceManager).getConfig(); + PerNodeAggregatorServer aggregator = + spy(new PerNodeAggregatorServer(serviceManager)); + return aggregator; + } + + private ContainerId getAMContainerId() { + return getContainerId(1L); + } + + private ContainerId getContainerId(long id) { + return ContainerId.newContainerId(appAttemptId, id); + } +}