diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index 3f4e93f..a64795d 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -294,6 +294,12 @@
org.apache.hadoop
+ hadoop-yarn-server-timeline-pluginstorage
+ ${project.version}
+
+
+
+ org.apache.hadoop
hadoop-mapreduce-client-jobclient
${project.version}
test-jar
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 23c2969..428ff81 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2452,6 +2452,42 @@ public static String getClusterId(Configuration conf) {
return clusterId;
}
+ // helper methods for timeline service configuration
+ /**
+ * Returns whether the timeline service is enabled via configuration.
+ *
+ * @param conf the configuration
+ * @return whether the timeline service is enabled.
+ */
+ public static boolean timelineServiceEnabled(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+ }
+
+ /**
+ * Returns the timeline service version. It does not check whether the
+ * timeline service itself is enabled.
+ *
+ * @param conf the configuration
+ * @return the timeline service version as a float.
+ */
+ public static float getTimelineServiceVersion(Configuration conf) {
+ return conf.getFloat(TIMELINE_SERVICE_VERSION,
+ DEFAULT_TIMELINE_SERVICE_VERSION);
+ }
+
+ /**
+ * Returns whether the timeline service v.1.5 is enabled via configuration.
+ *
+ * @param conf the configuration
+ * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a
+ * version greater than equal to 1.5.
+ */
+ public static boolean timelineServiceV1_5Enabled(Configuration conf) {
+ return timelineServiceEnabled(conf) &&
+ Math.abs(getTimelineServiceVersion(conf) - 1.5) < 0.00001;
+ }
+
/* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index 09a56ea..efce237 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -121,6 +121,21 @@
mockito-all
test
+
+ org.apache.hadoop
+ hadoop-yarn-server-timeline-pluginstorage
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+ test-jar
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 95dbddc..caf896a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -88,6 +88,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -187,6 +188,12 @@
// Configuration
private Configuration conf;
+ @VisibleForTesting
+ @Private
+ Configuration getConf() {
+ return conf;
+ }
+
// Handle to communicate with the Resource Manager
@SuppressWarnings("rawtypes")
private AMRMClientAsync amRMClient;
@@ -277,6 +284,9 @@
// Timeline Client
@VisibleForTesting
TimelineClient timelineClient;
+ static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
+ static final String APPID_TIMELINE_FILTER_NAME = "appId";
+ static final String USER_TIMELINE_FILTER_NAME = "user";
private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c";
@@ -660,7 +670,7 @@ public Void run() throws Exception {
@VisibleForTesting
NMCallbackHandler createNMCallbackHandler() {
- return new NMCallbackHandler(this);
+ return new NMCallbackHandler(this, conf);
}
@VisibleForTesting
@@ -782,7 +792,7 @@ public void onContainersCompleted(List completedContainers) {
}
if(timelineClient != null) {
publishContainerEndEvent(
- timelineClient, containerStatus, domainId, appSubmitterUgi);
+ timelineClient, containerStatus, domainId, appSubmitterUgi, conf);
}
}
@@ -867,9 +877,12 @@ public void onError(Throwable e) {
private ConcurrentMap containers =
new ConcurrentHashMap();
private final ApplicationMaster applicationMaster;
+ private Configuration conf;
- public NMCallbackHandler(ApplicationMaster applicationMaster) {
+ public NMCallbackHandler(ApplicationMaster applicationMaster,
+ Configuration config) {
this.applicationMaster = applicationMaster;
+ this.conf = config;
}
public void addContainer(ContainerId containerId, Container container) {
@@ -906,7 +919,7 @@ public void onContainerStarted(ContainerId containerId,
if(applicationMaster.timelineClient != null) {
ApplicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+ applicationMaster.domainId, applicationMaster.appSubmitterUgi, conf);
}
}
@@ -1120,15 +1133,17 @@ private String readContent(String filePath) throws IOException {
org.apache.commons.io.IOUtils.closeQuietly(ds);
}
}
-
+
private static void publishContainerStartEvent(
- final TimelineClient timelineClient, Container container, String domainId,
- UserGroupInformation ugi) {
+ final TimelineClient timelineClient, final Container container,
+ String domainId, UserGroupInformation ugi, final Configuration config) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
+ entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
+ .getApplicationAttemptId().getApplicationId().toString());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1141,7 +1156,8 @@ private static void publishContainerStartEvent(
@Override
public TimelinePutResponse run() throws Exception {
return processTimelineResponseErrors(
- timelineClient.putEntities(entity));
+ putContainerEntity(timelineClient, config,
+ container.getId().getApplicationAttemptId(), entity));
}
});
} catch (Exception e) {
@@ -1153,12 +1169,14 @@ public TimelinePutResponse run() throws Exception {
private static void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container,
- String domainId, UserGroupInformation ugi) {
+ String domainId, UserGroupInformation ugi, Configuration config) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
+ entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getContainerId()
+ .getApplicationAttemptId().getApplicationId().toString());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1166,14 +1184,30 @@ private static void publishContainerEndEvent(
event.addEventInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
try {
- TimelinePutResponse response = timelineClient.putEntities(entity);
- processTimelineResponseErrors(response);
+ processTimelineResponseErrors(
+ putContainerEntity(timelineClient, config,
+ container.getContainerId().getApplicationAttemptId(),
+ entity));
} catch (YarnException | IOException e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(), e);
}
}
+ private static TimelinePutResponse putContainerEntity(
+ TimelineClient timelineClient, Configuration config,
+ ApplicationAttemptId currAttemptId, TimelineEntity entity)
+ throws YarnException, IOException {
+ if (YarnConfiguration.timelineServiceV1_5Enabled(config)) {
+ TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
+ currAttemptId.getApplicationId(),
+ CONTAINER_ENTITY_GROUP_ID);
+ return timelineClient.putEntities(currAttemptId, groupId, entity);
+ } else {
+ return timelineClient.putEntities(entity);
+ }
+ }
+
private static void publishApplicationAttemptEvent(
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
@@ -1181,7 +1215,7 @@ private static void publishApplicationAttemptEvent(
entity.setEntityId(appAttemptId);
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java
new file mode 100644
index 0000000..592d3e5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Timeline v1.5 reader plugin for YARN distributed shell. It tranlsates an
+ * incoming getEntity request to a set of related timeline entity groups, via
+ * the information provided in the primary filter or entity id field.
+ */
+public class DistributedShellTimelinePlugin extends TimelineEntityGroupPlugin {
+
+ @Override
+ public Set getTimelineEntityGroupId(String entityType,
+ NameValuePair primaryFilter, Collection secondaryFilters) {
+ if (entityType.equals(ApplicationMaster.DSEntity.DS_CONTAINER.toString())) {
+ if (primaryFilter == null) {
+ return null;
+ }
+ return toEntitiGroupId(primaryFilter.getValue().toString());
+ }
+ return null;
+ }
+
+ @Override
+ public Set getTimelineEntityGroupId(String entityId,
+ String entityType) {
+ if (entityType.equals(ApplicationMaster.DSEntity.DS_CONTAINER.toString())) {
+ ContainerId containerId = ConverterUtils.toContainerId(entityId);
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ return toEntitiGroupId(appId.toString());
+ }
+ return null;
+ }
+
+ @Override
+ public Set getTimelineEntityGroupId(String entityType,
+ SortedSet entityIds, Set eventTypes) {
+ return null;
+ }
+
+ private Set toEntitiGroupId(String strAppId) {
+ ApplicationId appId = ConverterUtils.toApplicationId(strAppId);
+ TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
+ appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID);
+ Set result = new HashSet<>();
+ result.add(groupId);
+ return result;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
index 8e561c6..e8b4800 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class ContainerLaunchFailAppMaster extends ApplicationMaster {
@@ -36,15 +37,15 @@ public ContainerLaunchFailAppMaster() {
@Override
NMCallbackHandler createNMCallbackHandler() {
- return new FailContainerLaunchNMCallbackHandler(this);
+ return new FailContainerLaunchNMCallbackHandler(this, this.getConf());
}
class FailContainerLaunchNMCallbackHandler
extends ApplicationMaster.NMCallbackHandler {
public FailContainerLaunchNMCallbackHandler(
- ApplicationMaster applicationMaster) {
- super(applicationMaster);
+ ApplicationMaster applicationMaster, Configuration config) {
+ super(applicationMaster, config);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 3197875..d48c256 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -36,12 +36,19 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -50,29 +57,54 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
public class TestDistributedShell {
private static final Log LOG =
LogFactory.getLog(TestDistributedShell.class);
- protected MiniYARNCluster yarnCluster = null;
+ protected MiniYARNCluster yarnCluster = null;
+ protected MiniDFSCluster hdfsCluster = null;
+ private FileSystem fs = null;
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
+ private static final String TEST_NAME_SUBSTRING_FOR_V1_5 = "V1_5";
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private boolean isTestWithTimelineV1_5() {
+ return testName.getMethodName().contains(TEST_NAME_SUBSTRING_FOR_V1_5);
+ }
+
@Before
public void setup() throws Exception {
- setupInternal(NUM_NMS);
+ if (isTestWithTimelineV1_5()) {
+ setupInternal(NUM_NMS, 1.5f);
+ } else {
+ setupInternal(NUM_NMS, 1.0f);
+ }
}
protected void setupInternal(int numNodeManager) throws Exception {
+ setupInternal(numNodeManager, 1.0f);
+ }
+
+ private void setupInternal(int numNodeManager, float timelineVersion)
+ throws Exception {
LOG.info("Starting up YARN cluster");
@@ -84,6 +116,34 @@ protected void setupInternal(int numNodeManager) throws Exception {
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address",
"0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+
+ // ATS v1.5 specific configs
+ if (timelineVersion == 1.5f) {
+ if (hdfsCluster == null) {
+ HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+ hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+ .numDataNodes(1).build();
+ }
+ fs = hdfsCluster.getFileSystem();
+ Path activeDir = new Path(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT
+ );
+ Path doneDir = new Path(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+ );
+ fs.mkdirs(activeDir);
+ fs.mkdirs(doneDir);
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ hdfsCluster.getURI().toString());
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_STORE,
+ EntityGroupFSTimelineStore.class.getName());
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+ DistributedShellTimelinePlugin.class.getName());
+ conf.setLong(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+ 1);
+ }
if (yarnCluster == null) {
yarnCluster =
@@ -138,6 +198,13 @@ public void tearDown() throws IOException {
yarnCluster = null;
}
}
+ if (hdfsCluster != null) {
+ try {
+ hdfsCluster.shutdown();
+ } finally {
+ hdfsCluster = null;
+ }
+ }
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
@@ -156,6 +223,16 @@ public void testDSShellWithoutDomain() throws Exception {
testDSShell(false);
}
+ @Test(timeout=90000)
+ public void testDSShellWithoutDomainV1_5() throws Exception {
+ testDSShell(false);
+ }
+
+ @Test(timeout=90000)
+ public void testDSShellWithDomainV1_5() throws Exception {
+ testDSShell(true);
+ }
+
public void testDSShell(boolean haveDomain) throws Exception {
String[] args = {
"--jar",
@@ -239,6 +316,24 @@ public void run() {
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get());
+ if (isTestWithTimelineV1_5()) {
+ long scanInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+ );
+ Path doneDir = new Path(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+ );
+ // Wait till the data is moved to done dir, or timeout and fail
+ while (true) {
+ RemoteIterator iterApps = fs.listStatusIterator(doneDir);
+ if (iterApps.hasNext()) {
+ break;
+ }
+ Thread.sleep(scanInterval * 2);
+ }
+ }
+
TimelineDomain domain = null;
if (haveDomain) {
domain = yarnCluster.getApplicationHistoryServer()
@@ -265,11 +360,18 @@ public void run() {
Assert.assertEquals("DEFAULT",
entitiesAttempts.getEntities().get(0).getDomainId());
}
+ String currAttemptEntityId
+ = entitiesAttempts.getEntities().get(0).getEntityId();
+ ApplicationAttemptId attemptId
+ = ConverterUtils.toApplicationAttemptId(currAttemptEntityId);
+ NameValuePair primaryFilter = new NameValuePair(
+ ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
+ attemptId.getApplicationId().toString());
TimelineEntities entities = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
- null, null, null, null, null, null, null, null);
+ null, null, null, null, primaryFilter, null, null, null);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 68c9efd..8f13cef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -748,8 +748,12 @@ protected synchronized void serviceInit(Configuration conf)
appHistoryServer = new ApplicationHistoryServer();
conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
- conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
- MemoryTimelineStore.class, TimelineStore.class);
+ // Only set memory timeline store if timeline v1.5 is not enabled.
+ // Otherwise, caller has the freedom to choose storage impl.
+ if (!YarnConfiguration.timelineServiceV1_5Enabled(conf)) {
+ conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
+ MemoryTimelineStore.class, TimelineStore.class);
+ }
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
if (!useFixedPorts) {