diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index b3fbc3e..04c9109 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -43,6 +43,11 @@
org.apache.hadoop
+ hadoop-annotations
+
+
+
+ org.apache.hadoop
hadoop-yarn-api
@@ -53,12 +58,41 @@
org.apache.hadoop
- hadoop-yarn-server-common
+ hadoop-yarn-server-applicationhistoryservice
+
+
+
+ javax.servlet
+ servlet-api
+
+
+
+ com.sun.jersey
+ jersey-core
+
+
+
+ commons-logging
+ commons-logging
org.apache.hadoop
- hadoop-yarn-server-applicationhistoryservice
+ hadoop-common
+ test-jar
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+
+ org.mockito
+ mockito-all
+ test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
deleted file mode 100644
index 955aad5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TimelineAggregator {
-
-}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
new file mode 100644
index 0000000..bf72fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelAggregatorService extends BaseAggregatorService {
+ private final String applicationId;
+ // TODO define key metadata such as flow metadata, user, and queue
+
+ public AppLevelAggregatorService(String applicationId) {
+ super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
new file mode 100644
index 0000000..994c66f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public class BaseAggregatorService extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
+
+ public BaseAggregatorService(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ /**
+ * Handles entity writes. These writes are synchronous and are written to the
+ * backing storage without buffering/batching. If any entity already exists,
+ * it results in an update of the entity.
+ *
+ * This method should be reserved for selected critical entities and events.
+ * For normal voluminous writes one should use the async method
+ * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ * @return the response that contains the result of the post.
+ */
+ public TimelinePutResponse postEntities(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ return null;
+ }
+
+ /**
+ * Handles entity writes in an asynchronous manner. The method returns as soon
+ * as validation is done. No promises are made on how quickly it will be
+ * written to the backing storage or if it will always be written to the
+ * backing storage. Multiple writes to the same entities may be batched and
+ * appropriate values updated and result in fewer writes to the backing
+ * storage.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ */
+ public void postEntitiesAsync(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java
new file mode 100644
index 0000000..8463b19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+/**
+ * The main server class for the per-node timeline service aggregator. It is
+ * essentially a container service that manages per-app aggregator services and
+ * routes requests to the appropriate per-app services. It is also a REST end
+ * point for timeline service writes.
+ */
+@Private
+@Unstable
+@Path("/ws/v2/timeline")
+public class PerNodeAggregator extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(PerNodeAggregator.class);
+
+ // access to this map is synchronized with the map itself
+ private final Map services =
+ Collections.synchronizedMap(
+ new HashMap());
+
+ public PerNodeAggregator() {
+ super(PerNodeAggregator.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ /**
+ * Accepts writes to the aggregator, and returns a response. It expects an
+ * application as a context.
+ */
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ TimelineEntities entities) {
+ init(res);
+ UserGroupInformation callerUgi = getUser(req);
+ if (callerUgi == null) {
+ String msg = "The owner of the posted timeline entities is not set";
+ LOG.error(msg);
+ throw new ForbiddenException(msg);
+ }
+
+ // TODO how to express async posts and handle them
+ try {
+ AppLevelAggregatorService service = getAggregatorService(req);
+ if (service == null) {
+ LOG.error("Application not found");
+ throw new NotFoundException(); // different exception?
+ }
+ return service.postEntities(entities, callerUgi);
+ } catch (Exception e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ // TODO handle lifecycle of ALA services via special requests: RPC or REST?
+ /**
+ * Creates and adds an app level aggregator service that corresponds to the
+ * application id. The service is also initialized, started, and added to the
+ * the parent service.
+ *
+ * If the service already exists, an IllegalArgumemtnException is thrown
+ *
+ * @param applicationId the application id
+ */
+ public void addApplication(String applicationId) {
+ synchronized (services) {
+ AppLevelAggregatorService service = services.get(applicationId);
+ if (service == null) {
+ try {
+ service = new AppLevelAggregatorService(applicationId);
+ // initialize, start, and add it to the parent service so it can be
+ // cleaned up when the parent shuts down
+ service.init(getConfig());
+ service.start();
+ addService(service);
+ services.put(applicationId, service);
+ LOG.info("the application aggregator service for " + applicationId +
+ " was added");
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ String msg = "the application aggregator service for " + applicationId +
+ " already exists!";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+ }
+
+ /**
+ * Removes the app level aggregator service that corresponds to the
+ * application id. The service is also stopped.
+ *
+ * If the service does not exist, an IllegalArgumentException is thrown
+ *
+ * @param applicationId the application id for which the service should be
+ * removed
+ */
+ public void removeApplication(String applicationId) {
+ synchronized (services) {
+ AppLevelAggregatorService service = services.remove(applicationId);
+ if (service == null) {
+ String msg = "the application aggregator service for " + applicationId +
+ " does not exist!";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ } else {
+ // stop the service to do clean up
+ removeService(service);
+ service.stop();
+ LOG.info("the application aggregator service for " + applicationId +
+ " was removed");
+ }
+ }
+ }
+
+ private AppLevelAggregatorService
+ getAggregatorService(HttpServletRequest req) {
+ // TODO retrieve a one-time token or the application id from the request
+ // (most likely from the URI)
+ String applicationId = getApplicationId(req);
+ return services.get(applicationId);
+ }
+
+ private String getApplicationId(HttpServletRequest req) {
+ // TODO implement
+ return null;
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUgi = null;
+ if (remoteUser != null) {
+ callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUgi;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
deleted file mode 100644
index 7e0b775..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TestTimelineAggregator {
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
new file mode 100644
index 0000000..c0af8c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+
+public class TestAppLevelAggregatorService {
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
new file mode 100644
index 0000000..55953cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+public class TestBaseAggregatorService {
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java
new file mode 100644
index 0000000..e47f652
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestPerNodeAggregator {
+ @Test
+ public void testAddApplication() throws Exception {
+ String appId = "app_1234567890_001";
+ // should succeed
+ PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId);
+ aggregator.close();
+ }
+
+ @Test
+ public void testAddDuplicateApplication() throws Exception {
+ String appId = "app_1234567890_001";
+ PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId);
+
+ // try to add it again, and it should fail
+ try {
+ aggregator.addApplication(appId);
+ fail("adding the same application twice should have failed");
+ } catch (IllegalArgumentException e) { // proper exception
+ } finally {
+ aggregator.close();
+ }
+ }
+
+ @Test
+ public void testRemoveApplication() throws Exception {
+ String appId = "app_1234567890_001";
+ PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId);
+
+ // should succeed
+ aggregator.removeApplication(appId);
+ aggregator.close();
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testRemoveNonExistentApplication() throws Exception {
+ PerNodeAggregator aggregator = new PerNodeAggregator();
+ try {
+ aggregator.removeApplication("app_1234567890_001");
+ } finally {
+ aggregator.close();
+ }
+ }
+
+ private PerNodeAggregator createAggregatorAndAddApplication(String appId) {
+ PerNodeAggregator aggregator = spy(new PerNodeAggregator());
+ doReturn(new Configuration()).when(aggregator).getConfig();
+ aggregator.addApplication(appId);
+ return aggregator;
+ }
+}