diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index df1061d..448b60f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -35,7 +35,7 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 BUG FIXES -Trunk - Unreleased +Trunk - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java index 82ecdbd..4739d8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java @@ -47,7 +47,7 @@ public TimelineWriteResponse() { /** * Get a list of {@link TimelineWriteError} instances - * + * * @return a list of {@link TimelineWriteError} instances */ @XmlElement(name = "errors") @@ -57,7 +57,7 @@ public TimelineWriteResponse() { /** * Add a single {@link TimelineWriteError} instance into the existing list - * + * * @param error * a single {@link TimelineWriteError} instance */ @@ -67,7 +67,7 @@ public void addError(TimelineWriteError error) { /** * Add a list of {@link TimelineWriteError} instances into the existing list - * + * * @param errors * a list of {@link TimelineWriteError} instances */ @@ -77,7 +77,7 @@ public void addErrors(List errors) { /** * Set the list to the given list of {@link TimelineWriteError} instances - * + * * @param errors * a list of {@link TimelineWriteError} instances */ @@ -107,7 +107,7 @@ public void setErrors(List errors) { /** * Get the entity Id - * + * * @return the entity Id */ @XmlElement(name = "entity") @@ -117,7 +117,7 @@ public String getEntityId() { /** * Set the entity Id - * + * * @param entityId * the entity Id */ @@ -127,7 +127,7 @@ public void setEntityId(String entityId) { /** * Get the entity type - * + * * @return the entity type */ @XmlElement(name = "entitytype") @@ -137,7 +137,7 @@ public String getEntityType() { /** * Set the entity type - * + * * @param entityType * the entity type */ @@ -147,7 +147,7 @@ public void setEntityType(String entityType) { /** * Get the error code - * + * * @return an error code */ @XmlElement(name = "errorcode") @@ -157,7 +157,7 @@ public int getErrorCode() { /** * Set the error code to the given error code - * + * * @param errorCode * an error code */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index db49166..8f81f52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -207,7 +207,7 @@ private int appMasterRpcPort = -1; // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - + private boolean newTimelineService = false; // App Master configuration @@ -362,7 +362,7 @@ public boolean init(String[] args) throws ParseException, IOException { "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("debug", false, "Dump out debug information"); - opts.addOption("timeline_service_version", true, + opts.addOption("timeline_service_version", true, "Version for timeline service"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -503,7 +503,7 @@ public boolean init(String[] args) throws ParseException, IOException { if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { if (cliParser.hasOption("timeline_service_version")) { - String timelineServiceVersion = + String timelineServiceVersion = cliParser.getOptionValue("timeline_service_version", "v1"); if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { newTimelineService = false; @@ -577,8 +577,8 @@ public void run() throws YarnException, IOException { if(timelineClient != null) { if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, + publishApplicationAttemptEventOnNewTimelineService(timelineClient, + appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } else { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), @@ -653,8 +653,8 @@ public void run() throws YarnException, IOException { if(timelineClient != null) { if (newTimelineService) { - publishApplicationAttemptEventOnNewTimelineService(timelineClient, - appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, + publishApplicationAttemptEventOnNewTimelineService(timelineClient, + appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } else { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), @@ -1190,18 +1190,18 @@ public TimelinePutResponse run() throws Exception { e instanceof UndeclaredThrowableException ? e.getCause() : e); } } - + private static void publishContainerStartEventOnNewTimelineService( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setId(DSEvent.DS_CONTAINER_START.toString()); @@ -1227,13 +1227,13 @@ public TimelinePutResponse run() throws Exception { private static void publishContainerEndEventOnNewTimelineService( final TimelineClient timelineClient, ContainerStatus container, String domainId, UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setId(DSEvent.DS_CONTAINER_END.toString()); @@ -1259,13 +1259,13 @@ public TimelinePutResponse run() throws Exception { private static void publishApplicationAttemptEventOnNewTimelineService( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(appAttemptId); entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setId(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); @@ -1287,5 +1287,5 @@ public TimelinePutResponse run() throws Exception { e instanceof UndeclaredThrowableException ? e.getCause() : e); } } - + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 02a627e..934515e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -185,7 +185,7 @@ // Command line options private Options opts; - + private String timelineServiceVersion; private static final String shellCommandPath = "shellCommands"; @@ -357,11 +357,11 @@ public boolean init(String[] args) throws ParseException { throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." + " Specified virtual cores=" + amVCores); } - + if (cliParser.hasOption("timeline_service_version")) { - timelineServiceVersion = + timelineServiceVersion = cliParser.getOptionValue("timeline_service_version", "v1"); - if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || + if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || timelineServiceVersion.trim().equalsIgnoreCase("v2"))) { throw new IllegalArgumentException( "timeline_service_version is not set properly, should be 'v1' or 'v2'"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index ef69e4b..88cbc5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.junit.After; import org.junit.Assert; @@ -98,7 +98,7 @@ protected void setupInternal(int numNodeManager) throws Exception { // enable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME - + ".class", PerNodeTimelineAggregatorsAuxService.class.getName()); + + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); } conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 32ee5d8..ff3ff2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.timelineservice; @@ -6,7 +23,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -14,12 +31,12 @@ import static org.junit.Assert.fail; public class TestTimelineServiceClientIntegration { - private static PerNodeTimelineAggregatorsAuxService auxService; + private static PerNodeTimelineCollectorsAuxService auxService; @BeforeClass public static void setupClass() throws Exception { try { - auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); + auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0]); auxService.addApplication(ApplicationId.newInstance(0, 1)); } catch (ExitUtil.ExitException e) { fail(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 26790f1..51b2472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -91,6 +91,11 @@ commons-logging + + commons-io + commons-io + + org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java deleted file mode 100644 index 95ec9f8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; - -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage for a given YARN application. - * - * App-related lifecycle management is handled by this service. - */ -@Private -@Unstable -public class AppLevelTimelineAggregator extends TimelineAggregator { - private final String applicationId; - // TODO define key metadata such as flow metadata, user, and queue - - public AppLevelTimelineAggregator(String applicationId) { - super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId); - this.applicationId = applicationId; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java deleted file mode 100644 index cdc4e35..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.nio.ByteBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; -import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; -import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.hadoop.yarn.server.api.ContainerContext; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - -import com.google.common.annotations.VisibleForTesting; - -/** - * The top-level server for the per-node timeline aggregator collection. Currently - * it is defined as an auxiliary service to accommodate running within another - * daemon (e.g. node manager). - */ -@Private -@Unstable -public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService { - private static final Log LOG = - LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class); - private static final int SHUTDOWN_HOOK_PRIORITY = 30; - - private final TimelineAggregatorsCollection aggregatorCollection; - - public PerNodeTimelineAggregatorsAuxService() { - // use the same singleton - this(TimelineAggregatorsCollection.getInstance()); - } - - @VisibleForTesting PerNodeTimelineAggregatorsAuxService( - TimelineAggregatorsCollection aggregatorCollection) { - super("timeline_aggregator"); - this.aggregatorCollection = aggregatorCollection; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - aggregatorCollection.init(conf); - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - aggregatorCollection.start(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - aggregatorCollection.stop(); - super.serviceStop(); - } - - // these methods can be used as the basis for future service methods if the - // per-node aggregator runs separate from the node manager - /** - * Creates and adds an app level aggregator for the specified application id. - * The aggregator is also initialized and started. If the service already - * exists, no new service is created. - * - * @return whether it was added successfully - */ - public boolean addApplication(ApplicationId appId) { - String appIdString = appId.toString(); - AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appIdString); - return (aggregatorCollection.putIfAbsent(appIdString, aggregator) - == aggregator); - } - - /** - * Removes the app level aggregator for the specified application id. The - * aggregator is also stopped as a result. If the aggregator does not exist, no - * change is made. - * - * @return whether it was removed successfully - */ - public boolean removeApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return aggregatorCollection.remove(appIdString); - } - - /** - * Creates and adds an app level aggregator for the specified application id. - * The aggregator is also initialized and started. If the aggregator already - * exists, no new aggregator is created. - */ - @Override - public void initializeContainer(ContainerInitializationContext context) { - // intercept the event of the AM container being created and initialize the - // app level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - addApplication(appId); - } - } - - /** - * Removes the app level aggregator for the specified application id. The - * aggregator is also stopped as a result. If the aggregator does not exist, no - * change is made. - */ - @Override - public void stopContainer(ContainerTerminationContext context) { - // intercept the event of the AM container being stopped and remove the app - // level aggregator service - if (isApplicationMaster(context)) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - removeApplication(appId); - } - } - - private boolean isApplicationMaster(ContainerContext context) { - // TODO this is based on a (shaky) assumption that the container id (the - // last field of the full container id) for an AM is always 1 - // we want to make this much more reliable - ContainerId containerId = context.getContainerId(); - return containerId.getContainerId() == 1L; - } - - @VisibleForTesting - boolean hasApplication(String appId) { - return aggregatorCollection.containsKey(appId); - } - - @Override - public void initializeApplication(ApplicationInitializationContext context) { - } - - @Override - public void stopApplication(ApplicationTerminationContext context) { - } - - @Override - public ByteBuffer getMetaData() { - // TODO currently it is not used; we can return a more meaningful data when - // we connect it with an AM - return ByteBuffer.allocate(0); - } - - @VisibleForTesting - public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) { - Thread - .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args, - LOG); - PerNodeTimelineAggregatorsAuxService auxService = null; - try { - auxService = new PerNodeTimelineAggregatorsAuxService(); - ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), - SHUTDOWN_HOOK_PRIORITY); - YarnConfiguration conf = new YarnConfiguration(); - auxService.init(conf); - auxService.start(); - } catch (Throwable t) { - LOG.fatal("Error starting PerNodeAggregatorServer", t); - ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); - } - return auxService; - } - - private static class ShutdownHook implements Runnable { - private final PerNodeTimelineAggregatorsAuxService auxService; - - public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) { - this.auxService = auxService; - } - - public void run() { - auxService.stop(); - } - } - - public static void main(String[] args) { - launchServer(args); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java deleted file mode 100644 index dbd0895..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; -import org.apache.hadoop.util.ReflectionUtils; -/** - * Service that handles writes to the timeline service and writes them to the - * backing storage. - * - * Classes that extend this can add their own lifecycle management or - * customization of request handling. - */ -@Private -@Unstable -public abstract class TimelineAggregator extends CompositeService { - private static final Log LOG = LogFactory.getLog(TimelineAggregator.class); - - private TimelineWriter writer; - - public TimelineAggregator(String name) { - super(name); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - writer = ReflectionUtils.newInstance(conf.getClass( - YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, - TimelineWriter.class), conf); - writer.init(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - writer.stop(); - } - - public TimelineWriter getWriter() { - return writer; - } - - /** - * Handles entity writes. These writes are synchronous and are written to the - * backing storage without buffering/batching. If any entity already exists, - * it results in an update of the entity. - * - * This method should be reserved for selected critical entities and events. - * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. - * - * @param entities entities to post - * @param callerUgi the caller UGI - * @return the response that contains the result of the post. - */ - public TimelineWriteResponse postEntities(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" - + callerUgi + ")"); - } - - return writer.write(entities); - } - - /** - * Handles entity writes in an asynchronous manner. The method returns as soon - * as validation is done. No promises are made on how quickly it will be - * written to the backing storage or if it will always be written to the - * backing storage. Multiple writes to the same entities may be batched and - * appropriate values updated and result in fewer writes to the backing - * storage. - * - * @param entities entities to post - * @param callerUgi the caller UGI - */ - public void postEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement - if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java deleted file mode 100644 index 7d42f94..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.webapp.ForbiddenException; -import org.apache.hadoop.yarn.webapp.NotFoundException; - -import com.google.inject.Singleton; - -/** - * The main per-node REST end point for timeline service writes. It is - * essentially a container service that routes requests to the appropriate - * per-app services. - */ -@Private -@Unstable -@Singleton -@Path("/ws/v2/timeline") -public class TimelineAggregatorWebService { - private static final Log LOG = - LogFactory.getLog(TimelineAggregatorWebService.class); - - private @Context ServletContext context; - - @XmlRootElement(name = "about") - @XmlAccessorType(XmlAccessType.NONE) - @Public - @Unstable - public static class AboutInfo { - - private String about; - - public AboutInfo() { - - } - - public AboutInfo(String about) { - this.about = about; - } - - @XmlElement(name = "About") - public String getAbout() { - return about; - } - - public void setAbout(String about) { - this.about = about; - } - - } - - /** - * Return the description of the timeline web services. - */ - @GET - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public AboutInfo about( - @Context HttpServletRequest req, - @Context HttpServletResponse res) { - init(res); - return new AboutInfo("Timeline API"); - } - - /** - * Accepts writes to the aggregator, and returns a response. It simply routes - * the request to the app level aggregator. It expects an application as a - * context. - */ - @PUT - @Path("/entities") - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public Response putEntities( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("async") String async, - @QueryParam("appid") String appId, - TimelineEntities entities) { - init(res); - UserGroupInformation callerUgi = getUser(req); - if (callerUgi == null) { - String msg = "The owner of the posted timeline entities is not set"; - LOG.error(msg); - throw new ForbiddenException(msg); - } - - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - - try { - appId = parseApplicationId(appId); - if (appId == null) { - return Response.status(Response.Status.BAD_REQUEST).build(); - } - TimelineAggregator service = getAggregatorService(req, appId); - if (service == null) { - LOG.error("Application not found"); - throw new NotFoundException(); // different exception? - } - service.postEntities(entities, callerUgi); - return Response.ok().build(); - } catch (Exception e) { - LOG.error("Error putting entities", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } - } - - private String parseApplicationId(String appId) { - // Make sure the appId is not null and is valid - ApplicationId appID; - try { - if (appId != null) { - return ConverterUtils.toApplicationId(appId.trim()).toString(); - } else { - return null; - } - } catch (Exception e) { - return null; - } - } - - private TimelineAggregator - getAggregatorService(HttpServletRequest req, String appIdToParse) { - String appIdString = parseApplicationId(appIdToParse); - final TimelineAggregatorsCollection aggregatorCollection = - (TimelineAggregatorsCollection) context.getAttribute( - TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY); - return aggregatorCollection.get(appIdString); - } - - private void init(HttpServletResponse response) { - response.setContentType(null); - } - - private UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUgi = null; - if (remoteUser != null) { - callerUgi = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUgi; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java deleted file mode 100644 index 73b6d52..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import java.net.URI; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; - -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -/** - * Class that manages adding and removing aggregators and their lifecycle. It - * provides thread safety access to the aggregators inside. - * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. - */ -@Private -@Unstable -public class TimelineAggregatorsCollection extends CompositeService { - private static final Log LOG = - LogFactory.getLog(TimelineAggregatorsCollection.class); - private static final TimelineAggregatorsCollection INSTANCE = - new TimelineAggregatorsCollection(); - - // access to this map is synchronized with the map itself - private final Map aggregators = - Collections.synchronizedMap( - new HashMap()); - - // REST server for this aggregator collection - private HttpServer2 timelineRestServer; - - static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; - - static TimelineAggregatorsCollection getInstance() { - return INSTANCE; - } - - TimelineAggregatorsCollection() { - super(TimelineAggregatorsCollection.class.getName()); - } - - @Override - protected void serviceStart() throws Exception { - startWebApp(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (timelineRestServer != null) { - timelineRestServer.stop(); - } - super.serviceStop(); - } - - /** - * Put the aggregator into the collection if an aggregator mapped by id does - * not exist. - * - * @throws YarnRuntimeException if there was any exception in initializing and - * starting the app level service - * @return the aggregator associated with id after the potential put. - */ - public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) { - synchronized (aggregators) { - TimelineAggregator aggregatorInTable = aggregators.get(id); - if (aggregatorInTable == null) { - try { - // initialize, start, and add it to the collection so it can be - // cleaned up when the parent shuts down - aggregator.init(getConfig()); - aggregator.start(); - aggregators.put(id, aggregator); - LOG.info("the aggregator for " + id + " was added"); - return aggregator; - } catch (Exception e) { - throw new YarnRuntimeException(e); - } - } else { - String msg = "the aggregator for " + id + " already exists!"; - LOG.error(msg); - return aggregatorInTable; - } - } - } - - /** - * Removes the aggregator for the specified id. The aggregator is also stopped - * as a result. If the aggregator does not exist, no change is made. - * - * @return whether it was removed successfully - */ - public boolean remove(String id) { - synchronized (aggregators) { - TimelineAggregator aggregator = aggregators.remove(id); - if (aggregator == null) { - String msg = "the aggregator for " + id + " does not exist!"; - LOG.error(msg); - return false; - } else { - // stop the service to do clean up - aggregator.stop(); - LOG.info("the aggregator service for " + id + " was removed"); - return true; - } - } - } - - /** - * Returns the aggregator for the specified id. - * - * @return the aggregator or null if it does not exist - */ - public TimelineAggregator get(String id) { - return aggregators.get(id); - } - - /** - * Returns whether the aggregator for the specified id exists in this - * collection. - */ - public boolean containsKey(String id) { - return aggregators.containsKey(id); - } - - /** - * Launch the REST web server for this aggregator collection - */ - private void startWebApp() { - Configuration conf = getConfig(); - // use the same ports as the old ATS for now; we could create new properties - // for the new timeline service if needed - String bindAddress = WebAppUtils.getWebAppBindURL(conf, - YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, - WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); - LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); - try { - Configuration confForInfoServer = new Configuration(conf); - confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); - HttpServer2.Builder builder = new HttpServer2.Builder() - .setName("timeline") - .setConf(conf) - .addEndpoint(URI.create("http://" + bindAddress)); - timelineRestServer = builder.build(); - // TODO: replace this by an authentication filter in future. - HashMap options = new HashMap<>(); - String username = conf.get(HADOOP_HTTP_STATIC_USER, - DEFAULT_HADOOP_HTTP_STATIC_USER); - options.put(HADOOP_HTTP_STATIC_USER, username); - HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), - "static_user_filter_timeline", - StaticUserWebFilter.StaticUserFilter.class.getName(), - options, new String[] {"/*"}); - - timelineRestServer.addJerseyResourcePackage( - TimelineAggregatorWebService.class.getPackage().getName() + ";" - + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), - "/*"); - timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY, - TimelineAggregatorsCollection.getInstance()); - timelineRestServer.start(); - } catch (Exception e) { - String msg = "The per-node aggregator webapp failed to start."; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java new file mode 100644 index 0000000..7d59876 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelTimelineCollector extends TimelineCollector { + private final String applicationId; + // TODO define key metadata such as flow metadata, user, and queue + + public AppLevelTimelineCollector(String applicationId) { + super(AppLevelTimelineCollector.class.getName() + " - " + applicationId); + this.applicationId = applicationId; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java new file mode 100644 index 0000000..ca5ab59 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerContext; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The top-level server for the per-node timeline collectors collection. + * Currently it is defined as an auxiliary service to accommodate running within + * another daemon (e.g. node manager). + */ +@Private +@Unstable +public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { + private static final Log LOG = + LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class); + private static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private final TimelineCollectorsManager collectorsManager; + + public PerNodeTimelineCollectorsAuxService() { + // use the same singleton + this(TimelineCollectorsManager.getInstance()); + } + + @VisibleForTesting PerNodeTimelineCollectorsAuxService( + TimelineCollectorsManager collectorsManager) { + super("timeline_collector"); + this.collectorsManager = collectorsManager; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + collectorsManager.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + collectorsManager.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + collectorsManager.stop(); + super.serviceStop(); + } + + // these methods can be used as the basis for future service methods if the + // per-node collector runs separate from the node manager + /** + * Creates and adds an app level collector for the specified application id. + * The collector is also initialized and started. If the service already + * exists, no new service is created. + * + * @return whether it was added successfully + */ + public boolean addApplication(ApplicationId appId) { + String appIdString = appId.toString(); + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appIdString); + return (collectorsManager.putIfAbsent(appIdString, collector) + == collector); + } + + /** + * Removes the app level collector for the specified application id. The + * collector is also stopped as a result. If the collector does not exist, no + * change is made. + * + * @return whether it was removed successfully + */ + public boolean removeApplication(ApplicationId appId) { + String appIdString = appId.toString(); + return collectorsManager.remove(appIdString); + } + + /** + * Creates and adds an app level collector for the specified application id. + * The collector is also initialized and started. If the collector already + * exists, no new collector is created. + */ + @Override + public void initializeContainer(ContainerInitializationContext context) { + // intercept the event of the AM container being created and initialize the + // app level collector service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + addApplication(appId); + } + } + + /** + * Removes the app level collector for the specified application id. The + * collector is also stopped as a result. If the collector does not exist, no + * change is made. + */ + @Override + public void stopContainer(ContainerTerminationContext context) { + // intercept the event of the AM container being stopped and remove the app + // level collector service + if (isApplicationMaster(context)) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + removeApplication(appId); + } + } + + private boolean isApplicationMaster(ContainerContext context) { + // TODO this is based on a (shaky) assumption that the container id (the + // last field of the full container id) for an AM is always 1 + // we want to make this much more reliable + ContainerId containerId = context.getContainerId(); + return containerId.getContainerId() == 1L; + } + + @VisibleForTesting + boolean hasApplication(String appId) { + return collectorsManager.containsKey(appId); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + + @Override + public ByteBuffer getMetaData() { + // TODO currently it is not used; we can return a more meaningful data when + // we connect it with an AM + return ByteBuffer.allocate(0); + } + + @VisibleForTesting + public static PerNodeTimelineCollectorsAuxService + launchServer(String[] args) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage( + PerNodeTimelineCollectorsAuxService.class, args, LOG); + PerNodeTimelineCollectorsAuxService auxService = null; + try { + auxService = new PerNodeTimelineCollectorsAuxService(); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), + SHUTDOWN_HOOK_PRIORITY); + YarnConfiguration conf = new YarnConfiguration(); + auxService.init(conf); + auxService.start(); + } catch (Throwable t) { + LOG.fatal("Error starting PerNodeCollectorServer", t); + ExitUtil.terminate(-1, "Error starting PerNodeCollectorServer"); + } + return auxService; + } + + private static class ShutdownHook implements Runnable { + private final PerNodeTimelineCollectorsAuxService auxService; + + public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) { + this.auxService = auxService; + } + + public void run() { + auxService.stop(); + } + } + + public static void main(String[] args) { + launchServer(args); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java new file mode 100644 index 0000000..6e20e69 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage. + * + * Classes that extend this can add their own lifecycle management or + * customization of request handling. + */ +@Private +@Unstable +public abstract class TimelineCollector extends CompositeService { + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + + private TimelineWriter writer; + + public TimelineCollector(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + writer = ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, + TimelineWriter.class), conf); + writer.init(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + writer.stop(); + } + + public TimelineWriter getWriter() { + return writer; + } + + /** + * Handles entity writes. These writes are synchronous and are written to the + * backing storage without buffering/batching. If any entity already exists, + * it results in an update of the entity. + * + * This method should be reserved for selected critical entities and events. + * For normal voluminous writes one should use the async method + * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * + * @param entities entities to post + * @param callerUgi the caller UGI + * @return the response that contains the result of the post. + */ + public TimelineWriteResponse postEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + + return writer.write(entities); + } + + /** + * Handles entity writes in an asynchronous manner. The method returns as soon + * as validation is done. No promises are made on how quickly it will be + * written to the backing storage or if it will always be written to the + * backing storage. Multiple writes to the same entities may be batched and + * appropriate values updated and result in fewer writes to the backing + * storage. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void postEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java new file mode 100644 index 0000000..889271e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +import com.google.inject.Singleton; + +/** + * The main per-node REST end point for timeline service writes. It is + * essentially a container service that routes requests to the appropriate + * per-app services. + */ +@Private +@Unstable +@Singleton +@Path("/ws/v2/timeline") +public class TimelineCollectorWebService { + private static final Log LOG = + LogFactory.getLog(TimelineCollectorWebService.class); + + private @Context ServletContext context; + + @XmlRootElement(name = "about") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class AboutInfo { + + private String about; + + public AboutInfo() { + + } + + public AboutInfo(String about) { + this.about = about; + } + + @XmlElement(name = "About") + public String getAbout() { + return about; + } + + public void setAbout(String about) { + this.about = about; + } + + } + + /** + * Return the description of the timeline web services. + */ + @GET + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public AboutInfo about( + @Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return new AboutInfo("Timeline API"); + } + + /** + * Accepts writes to the aggregator, and returns a response. It simply routes + * the request to the app level aggregator. It expects an application as a + * context. + */ + @PUT + @Path("/entities") + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public Response putEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + TimelineEntities entities) { + init(res); + UserGroupInformation callerUgi = getUser(req); + if (callerUgi == null) { + String msg = "The owner of the posted timeline entities is not set"; + LOG.error(msg); + throw new ForbiddenException(msg); + } + + // TODO how to express async posts and handle them + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + + try { + appId = parseApplicationId(appId); + if (appId == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + TimelineCollector service = getAggregatorService(req, appId); + if (service == null) { + LOG.error("Application not found"); + throw new NotFoundException(); // different exception? + } + service.postEntities(entities, callerUgi); + return Response.ok().build(); + } catch (Exception e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private String parseApplicationId(String appId) { + // Make sure the appId is not null and is valid + try { + if (appId != null) { + return ConverterUtils.toApplicationId(appId.trim()).toString(); + } else { + return null; + } + } catch (Exception e) { + return null; + } + } + + private TimelineCollector + getAggregatorService(HttpServletRequest req, String appIdToParse) { + String appIdString = parseApplicationId(appIdToParse); + final TimelineCollectorsManager collectorsManager = + (TimelineCollectorsManager) context.getAttribute( + TimelineCollectorsManager.COLLECTOR_MANAGER_ATTR_KEY); + return collectorsManager.get(appIdString); + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUgi = null; + if (remoteUser != null) { + callerUgi = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUgi; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorsManager.java new file mode 100644 index 0000000..4d98ba6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorsManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +/** + * Class that manages adding and removing collectors and their lifecycle. It + * provides thread safety access to the collectors inside. + * + * It is a singleton, and instances should be obtained via + * {@link #getInstance()}. + */ +@Private +@Unstable +public class TimelineCollectorsManager extends CompositeService { + private static final Log LOG = + LogFactory.getLog(TimelineCollectorsManager.class); + private static final TimelineCollectorsManager INSTANCE = + new TimelineCollectorsManager(); + + // access to this map is synchronized with the map itself + private final Map collectors = + Collections.synchronizedMap( + new HashMap()); + + // REST server for this collector collection + private HttpServer2 timelineRestServer; + + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; + + static TimelineCollectorsManager getInstance() { + return INSTANCE; + } + + TimelineCollectorsManager() { + super(TimelineCollectorsManager.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (timelineRestServer != null) { + timelineRestServer.stop(); + } + super.serviceStop(); + } + + /** + * Put the collector into the collection if a collector mapped by id does not + * exist. + * + * @throws YarnRuntimeException if there was any exception in initializing and + * starting the app level service + * @return the collector associated with id after the potential put. + */ + public TimelineCollector putIfAbsent(String id, TimelineCollector collector) { + synchronized (collectors) { + TimelineCollector collectorInTable = collectors.get(id); + if (collectorInTable == null) { + try { + // initialize, start, and add it to the collection so it can be + // cleaned up when the parent shuts down + collector.init(getConfig()); + collector.start(); + collectors.put(id, collector); + LOG.info("the collector for " + id + " was added"); + return collector; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + String msg = "the collector for " + id + " already exists!"; + LOG.error(msg); + return collectorInTable; + } + } + } + + /** + * Removes the collector for the specified id. The collector is also stopped + * as a result. If the collector does not exist, no change is made. + * + * @return whether it was removed successfully + */ + public boolean remove(String id) { + synchronized (collectors) { + TimelineCollector collector = collectors.remove(id); + if (collector == null) { + String msg = "the collector for " + id + " does not exist!"; + LOG.error(msg); + return false; + } else { + // stop the service to do clean up + collector.stop(); + LOG.info("the collector service for " + id + " was removed"); + return true; + } + } + } + + /** + * Returns the collector for the specified id. + * + * @return the collector or null if it does not exist + */ + public TimelineCollector get(String id) { + return collectors.get(id); + } + + /** + * Returns whether the collector for the specified id exists in this + * collection. + */ + public boolean containsKey(String id) { + return collectors.containsKey(id); + } + + /** + * Launch the REST web server for this collector collection + */ + private void startWebApp() { + Configuration conf = getConfig(); + // use the same ports as the old ATS for now; we could create new properties + // for the new timeline service if needed + String bindAddress = WebAppUtils.getWebAppBindURL(conf, + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); + LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); + try { + Configuration confForInfoServer = new Configuration(conf); + confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create("http://" + bindAddress)); + timelineRestServer = builder.build(); + // TODO: replace this by an authentication filter in future. + HashMap options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + timelineRestServer.addJerseyResourcePackage( + TimelineCollectorWebService.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, + TimelineCollectorsManager.getInstance()); + timelineRestServer.start(); + } catch (Exception e) { + String msg = "The per-node collector webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 4a57e97..f5603f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -65,7 +65,7 @@ * Stores the entire information in {@link TimelineEntity} to the * timeline store. Any errors occurring for individual write request objects * will be reported in the response. - * + * * @param data * a {@link TimelineEntity} object * @return {@link TimelineWriteResponse} object. @@ -116,10 +116,10 @@ private void write(TimelineEntity entity, * Aggregates the entity information to the timeline store based on which * track this entity is to be rolled up to The tracks along which aggregations * are to be done are given by {@link TimelineAggregationTrack} - * + * * Any errors occurring for individual write request objects will be reported * in the response. - * + * * @param data * a {@link TimelineEntity} object * a {@link TimelineAggregationTrack} enum value diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 71ad7ab..2fae625 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -22,10 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.service.Service; /** * This interface is for storing application timeline information. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java deleted file mode 100644 index 8f95814..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - - -public class TestAppLevelTimelineAggregator { -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java deleted file mode 100644 index 1c89ead..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; -import org.junit.Test; - -public class TestPerNodeTimelineAggregatorsAuxService { - private ApplicationAttemptId appAttemptId; - - public TestPerNodeTimelineAggregatorsAuxService() { - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - } - - @Test - public void testAddApplication() throws Exception { - PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); - // auxService should have a single app - assertTrue(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); - auxService.close(); - } - - @Test - public void testAddApplicationNonAMContainer() throws Exception { - PerNodeTimelineAggregatorsAuxService auxService = createAggregator(); - - ContainerId containerId = getContainerId(2L); // not an AM - ContainerInitializationContext context = - mock(ContainerInitializationContext.class); - when(context.getContainerId()).thenReturn(containerId); - auxService.initializeContainer(context); - // auxService should not have that app - assertFalse(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); - } - - @Test - public void testRemoveApplication() throws Exception { - PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); - // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); - - ContainerId containerId = getAMContainerId(); - ContainerTerminationContext context = - mock(ContainerTerminationContext.class); - when(context.getContainerId()).thenReturn(containerId); - auxService.stopContainer(context); - // auxService should not have that app - assertFalse(auxService.hasApplication(appIdStr)); - auxService.close(); - } - - @Test - public void testRemoveApplicationNonAMContainer() throws Exception { - PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); - // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); - - ContainerId containerId = getContainerId(2L); // not an AM - ContainerTerminationContext context = - mock(ContainerTerminationContext.class); - when(context.getContainerId()).thenReturn(containerId); - auxService.stopContainer(context); - // auxService should still have that app - assertTrue(auxService.hasApplication(appIdStr)); - auxService.close(); - } - - @Test(timeout = 60000) - public void testLaunch() throws Exception { - ExitUtil.disableSystemExit(); - PerNodeTimelineAggregatorsAuxService auxService = null; - try { - auxService = - PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); - } catch (ExitUtil.ExitException e) { - assertEquals(0, e.status); - ExitUtil.resetFirstExitException(); - fail(); - } finally { - if (auxService != null) { - auxService.stop(); - } - } - } - - private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() { - PerNodeTimelineAggregatorsAuxService auxService = createAggregator(); - // create an AM container - ContainerId containerId = getAMContainerId(); - ContainerInitializationContext context = - mock(ContainerInitializationContext.class); - when(context.getContainerId()).thenReturn(containerId); - auxService.initializeContainer(context); - return auxService; - } - - private PerNodeTimelineAggregatorsAuxService createAggregator() { - TimelineAggregatorsCollection - aggregatorsCollection = spy(new TimelineAggregatorsCollection()); - doReturn(new Configuration()).when(aggregatorsCollection).getConfig(); - PerNodeTimelineAggregatorsAuxService auxService = - spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection)); - return auxService; - } - - private ContainerId getAMContainerId() { - return getContainerId(1L); - } - - private ContainerId getContainerId(long id) { - return ContainerId.newContainerId(appAttemptId, id); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java deleted file mode 100644 index cec1d71..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider; -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -public class TestTimelineAggregatorsCollection { - - @Test(timeout=60000) - public void testMultithreadedAdd() throws Exception { - final TimelineAggregatorsCollection aggregatorCollection = - spy(new TimelineAggregatorsCollection()); - doReturn(new Configuration()).when(aggregatorCollection).getConfig(); - - final int NUM_APPS = 5; - List> tasks = new ArrayList>(); - for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); - Callable task = new Callable() { - public Boolean call() { - AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appId); - return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List> futures = executor.invokeAll(tasks); - for (Future future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - assertTrue(aggregatorCollection.containsKey(String.valueOf(i))); - } - } - - @Test - public void testMultithreadedAddAndRemove() throws Exception { - final TimelineAggregatorsCollection aggregatorCollection = - spy(new TimelineAggregatorsCollection()); - doReturn(new Configuration()).when(aggregatorCollection).getConfig(); - - final int NUM_APPS = 5; - List> tasks = new ArrayList>(); - for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); - Callable task = new Callable() { - public Boolean call() { - AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appId); - boolean successPut = - (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); - return successPut && aggregatorCollection.remove(appId); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List> futures = executor.invokeAll(tasks); - for (Future future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - assertFalse(aggregatorCollection.containsKey(String.valueOf(i))); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java new file mode 100644 index 0000000..74c81a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + + +public class TestAppLevelTimelineCollector { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java new file mode 100644 index 0000000..bbbba68 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.junit.Test; + +public class TestPerNodeTimelineCollectorsAuxService { + private ApplicationAttemptId appAttemptId; + + public TestPerNodeTimelineCollectorsAuxService() { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + } + + @Test + public void testAddApplication() throws Exception { + PerNodeTimelineCollectorsAuxService auxService = + createCollectorAndAddApplication(); + // auxService should have a single app + assertTrue(auxService.hasApplication( + appAttemptId.getApplicationId().toString())); + auxService.close(); + } + + @Test + public void testAddApplicationNonAMContainer() throws Exception { + PerNodeTimelineCollectorsAuxService auxService = createCollector(); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.initializeContainer(context); + // auxService should not have that app + assertFalse(auxService.hasApplication( + appAttemptId.getApplicationId().toString())); + } + + @Test + public void testRemoveApplication() throws Exception { + PerNodeTimelineCollectorsAuxService auxService = + createCollectorAndAddApplication(); + // auxService should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(auxService.hasApplication(appIdStr)); + + ContainerId containerId = getAMContainerId(); + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.stopContainer(context); + // auxService should not have that app + assertFalse(auxService.hasApplication(appIdStr)); + auxService.close(); + } + + @Test + public void testRemoveApplicationNonAMContainer() throws Exception { + PerNodeTimelineCollectorsAuxService auxService = + createCollectorAndAddApplication(); + // auxService should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(auxService.hasApplication(appIdStr)); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.stopContainer(context); + // auxService should still have that app + assertTrue(auxService.hasApplication(appIdStr)); + auxService.close(); + } + + @Test(timeout = 60000) + public void testLaunch() throws Exception { + ExitUtil.disableSystemExit(); + PerNodeTimelineCollectorsAuxService auxService = null; + try { + auxService = + PerNodeTimelineCollectorsAuxService.launchServer(new String[0]); + } catch (ExitUtil.ExitException e) { + assertEquals(0, e.status); + ExitUtil.resetFirstExitException(); + fail(); + } finally { + if (auxService != null) { + auxService.stop(); + } + } + } + + private PerNodeTimelineCollectorsAuxService + createCollectorAndAddApplication() { + PerNodeTimelineCollectorsAuxService auxService = createCollector(); + // create an AM container + ContainerId containerId = getAMContainerId(); + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.initializeContainer(context); + return auxService; + } + + private PerNodeTimelineCollectorsAuxService createCollector() { + TimelineCollectorsManager + collectorsManager = spy(new TimelineCollectorsManager()); + doReturn(new Configuration()).when(collectorsManager).getConfig(); + PerNodeTimelineCollectorsAuxService auxService = + spy(new PerNodeTimelineCollectorsAuxService(collectorsManager)); + return auxService; + } + + private ContainerId getAMContainerId() { + return getContainerId(1L); + } + + private ContainerId getContainerId(long id) { + return ContainerId.newContainerId(appAttemptId, id); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorsManager.java new file mode 100644 index 0000000..5dfac28 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorsManager.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestTimelineCollectorsManager { + + @Test(timeout=60000) + public void testMultithreadedAdd() throws Exception { + final TimelineCollectorsManager collectorsManager = + spy(new TimelineCollectorsManager()); + doReturn(new Configuration()).when(collectorsManager).getConfig(); + + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable task = new Callable() { + public Boolean call() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + return + (collectorsManager.putIfAbsent(appId, collector) == collector); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertTrue(collectorsManager.containsKey(String.valueOf(i))); + } + } + + @Test + public void testMultithreadedAddAndRemove() throws Exception { + final TimelineCollectorsManager collectorsManager = + spy(new TimelineCollectorsManager()); + doReturn(new Configuration()).when(collectorsManager).getConfig(); + + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable task = new Callable() { + public Boolean call() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + boolean successPut = + (collectorsManager.putIfAbsent(appId, collector) == collector); + return successPut && collectorsManager.remove(appId); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertFalse(collectorsManager.containsKey(String.valueOf(i))); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index f720454..ea462a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -27,12 +27,12 @@ import java.nio.file.Paths; import java.util.List; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; -import org.apache.commons.io.FileUtils; public class TestFileSystemTimelineWriterImpl { @@ -42,8 +42,8 @@ */ @Test public void testWriteEntityToFile() throws Exception { - String name = "unit_test_BaseAggregator_testWriteEntityToFile_" - + Long.toString(System.currentTimeMillis()); +// String name = "unit_test_BaseAggregator_testWriteEntityToFile_" +// + Long.toString(System.currentTimeMillis()); TimelineEntities te = new TimelineEntities(); TimelineEntity entity = new TimelineEntity(); @@ -55,25 +55,27 @@ public void testWriteEntityToFile() throws Exception { entity.setModifiedTime(1425016502000L); te.addEntity(entity); - FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl(); - fsi.serviceInit(new Configuration()); - fsi.write(te); + try (FileSystemTimelineWriterImpl fsi = + new FileSystemTimelineWriterImpl()) { + fsi.serviceInit(new Configuration()); + fsi.write(te); - String fileName = fsi.getOutputRoot() + "/" + type + "/" + id - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path = Paths.get(fileName); - File f = new File(fileName); - assertTrue(f.exists() && !f.isDirectory()); - List data = Files.readAllLines(path, StandardCharsets.UTF_8); - // ensure there's only one entity + 1 new line - assertTrue(data.size() == 2); - String d = data.get(0); - // confirm the contents same as what was written - assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + String fileName = fsi.getOutputRoot() + "/" + type + "/" + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = Paths.get(fileName); + File f = new File(fileName); + assertTrue(f.exists() && !f.isDirectory()); + List data = Files.readAllLines(path, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue(data.size() == 2); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); - // delete the directory - File outputDir = new File(fsi.getOutputRoot()); - FileUtils.deleteDirectory(outputDir); - assertTrue(!(f.exists())); + // delete the directory + File outputDir = new File(fsi.getOutputRoot()); + FileUtils.deleteDirectory(outputDir); + assertTrue(!(f.exists())); + } } }