diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index b3fbc3e..04c9109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -43,6 +43,11 @@ org.apache.hadoop + hadoop-annotations + + + + org.apache.hadoop hadoop-yarn-api @@ -53,12 +58,41 @@ org.apache.hadoop - hadoop-yarn-server-common + hadoop-yarn-server-applicationhistoryservice + + + + javax.servlet + servlet-api + + + + com.sun.jersey + jersey-core + + + + commons-logging + commons-logging org.apache.hadoop - hadoop-yarn-server-applicationhistoryservice + hadoop-common + test-jar + test + + + + junit + junit + test + + + + org.mockito + mockito-all + test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java deleted file mode 100644 index 955aad5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice; - -public class TimelineAggregator { - -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java new file mode 100644 index 0000000..bf72fb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelAggregatorService extends BaseAggregatorService { + private final String applicationId; + // TODO define key metadata such as flow metadata, user, and queue + + public AppLevelAggregatorService(String applicationId) { + super(AppLevelAggregatorService.class.getName() + " - " + applicationId); + this.applicationId = applicationId; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java new file mode 100644 index 0000000..994c66f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage. + * + * Classes that extend this can add their own lifecycle management or + * customization of request handling. + */ +@Private +@Unstable +public class BaseAggregatorService extends CompositeService { + private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class); + + public BaseAggregatorService(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + /** + * Handles entity writes. These writes are synchronous and are written to the + * backing storage without buffering/batching. If any entity already exists, + * it results in an update of the entity. + * + * This method should be reserved for selected critical entities and events. + * For normal voluminous writes one should use the async method + * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * + * @param entities entities to post + * @param callerUgi the caller UGI + * @return the response that contains the result of the post. + */ + public TimelinePutResponse postEntities(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + return null; + } + + /** + * Handles entity writes in an asynchronous manner. The method returns as soon + * as validation is done. No promises are made on how quickly it will be + * written to the backing storage or if it will always be written to the + * backing storage. Multiple writes to the same entities may be batched and + * appropriate values updated and result in fewer writes to the backing + * storage. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void postEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java new file mode 100644 index 0000000..8463b19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregator.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +/** + * The main server class for the per-node timeline service aggregator. It is + * essentially a container service that manages per-app aggregator services and + * routes requests to the appropriate per-app services. It is also a REST end + * point for timeline service writes. + */ +@Private +@Unstable +@Path("/ws/v2/timeline") +public class PerNodeAggregator extends CompositeService { + private static final Log LOG = LogFactory.getLog(PerNodeAggregator.class); + + // access to this map is synchronized with the map itself + private final Map services = + Collections.synchronizedMap( + new HashMap()); + + public PerNodeAggregator() { + super(PerNodeAggregator.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + /** + * Accepts writes to the aggregator, and returns a response. It expects an + * application as a context. + */ + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + TimelineEntities entities) { + init(res); + UserGroupInformation callerUgi = getUser(req); + if (callerUgi == null) { + String msg = "The owner of the posted timeline entities is not set"; + LOG.error(msg); + throw new ForbiddenException(msg); + } + + // TODO how to express async posts and handle them + try { + AppLevelAggregatorService service = getAggregatorService(req); + if (service == null) { + LOG.error("Application not found"); + throw new NotFoundException(); // different exception? + } + return service.postEntities(entities, callerUgi); + } catch (Exception e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + // TODO handle lifecycle of ALA services via special requests: RPC or REST? + /** + * Creates and adds an app level aggregator service that corresponds to the + * application id. The service is also initialized, started, and added to the + * the parent service. + * + * If the service already exists, an IllegalArgumemtnException is thrown + * + * @param applicationId the application id + */ + public void addApplication(String applicationId) { + synchronized (services) { + AppLevelAggregatorService service = services.get(applicationId); + if (service == null) { + try { + service = new AppLevelAggregatorService(applicationId); + // initialize, start, and add it to the parent service so it can be + // cleaned up when the parent shuts down + service.init(getConfig()); + service.start(); + addService(service); + services.put(applicationId, service); + LOG.info("the application aggregator service for " + applicationId + + " was added"); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + String msg = "the application aggregator service for " + applicationId + + " already exists!"; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + } + + /** + * Removes the app level aggregator service that corresponds to the + * application id. The service is also stopped. + * + * If the service does not exist, an IllegalArgumentException is thrown + * + * @param applicationId the application id for which the service should be + * removed + */ + public void removeApplication(String applicationId) { + synchronized (services) { + AppLevelAggregatorService service = services.remove(applicationId); + if (service == null) { + String msg = "the application aggregator service for " + applicationId + + " does not exist!"; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } else { + // stop the service to do clean up + removeService(service); + service.stop(); + LOG.info("the application aggregator service for " + applicationId + + " was removed"); + } + } + } + + private AppLevelAggregatorService + getAggregatorService(HttpServletRequest req) { + // TODO retrieve a one-time token or the application id from the request + // (most likely from the URI) + String applicationId = getApplicationId(req); + return services.get(applicationId); + } + + private String getApplicationId(HttpServletRequest req) { + // TODO implement + return null; + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUgi = null; + if (remoteUser != null) { + callerUgi = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUgi; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java deleted file mode 100644 index 7e0b775..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice; - -public class TestTimelineAggregator { - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java new file mode 100644 index 0000000..c0af8c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + + +public class TestAppLevelAggregatorService { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java new file mode 100644 index 0000000..55953cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +public class TestBaseAggregatorService { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java new file mode 100644 index 0000000..e47f652 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregator.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestPerNodeAggregator { + @Test + public void testAddApplication() throws Exception { + String appId = "app_1234567890_001"; + // should succeed + PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId); + aggregator.close(); + } + + @Test + public void testAddDuplicateApplication() throws Exception { + String appId = "app_1234567890_001"; + PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId); + + // try to add it again, and it should fail + try { + aggregator.addApplication(appId); + fail("adding the same application twice should have failed"); + } catch (IllegalArgumentException e) { // proper exception + } finally { + aggregator.close(); + } + } + + @Test + public void testRemoveApplication() throws Exception { + String appId = "app_1234567890_001"; + PerNodeAggregator aggregator = createAggregatorAndAddApplication(appId); + + // should succeed + aggregator.removeApplication(appId); + aggregator.close(); + } + + @Test(expected=IllegalArgumentException.class) + public void testRemoveNonExistentApplication() throws Exception { + PerNodeAggregator aggregator = new PerNodeAggregator(); + try { + aggregator.removeApplication("app_1234567890_001"); + } finally { + aggregator.close(); + } + } + + private PerNodeAggregator createAggregatorAndAddApplication(String appId) { + PerNodeAggregator aggregator = spy(new PerNodeAggregator()); + doReturn(new Configuration()).when(aggregator).getConfig(); + aggregator.addApplication(appId); + return aggregator; + } +}