diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index 1c295e1..02df640 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -254,8 +254,9 @@ public void putDomain(ApplicationAttemptId appAttemptId,
@Override
public void close() throws Exception {
- if (this.logFDsCache != null) {
- this.logFDsCache.close();
+ if (logFDsCache != null) {
+ logFDsCache.flush();
+ logFDsCache.close();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
index c616e63..fee4995 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@@ -104,19 +105,27 @@ public ClientResponse run() throws Exception {
}
});
} catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException)cause;
+ } else {
+ throw new IOException(cause);
+ }
} catch (InterruptedException ie) {
- throw new IOException(ie);
+ throw (IOException)new InterruptedIOException().initCause(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
- if (LOG.isDebugEnabled() && resp != null) {
- String output = resp.getEntity(String.class);
- LOG.debug("HTTP error code: " + resp.getStatus()
- + " Server response : \n" + output);
+ if (resp != null) {
+ msg += " HTTP error code: " + resp.getStatus();
+ if (LOG.isDebugEnabled()) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
}
throw new YarnException(msg);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1c7f078..557185d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2043,6 +2043,13 @@
+ yarn.timeline-service.entity-group-fs-store.rm.integration.enabled
+ true
+ Should the ATS v1.5 entity group file system storage query
+ the YARN Resource Manager for the active state of the application?
+
+
+
yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size
Read cache size for the leveldb cache storage in ATS v1.5 plugin storage.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
index eb47ef2..264f6ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
@@ -129,9 +129,9 @@ public TimelineEntities getEntities(
getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
- "windowStart, windowEnd or limit is not a numeric value.");
+ "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e);
} catch (IllegalArgumentException e) {
- throw new BadRequestException("requested invalid field.");
+ throw new BadRequestException("requested invalid field: " + e);
} catch (Exception e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
@@ -160,8 +160,8 @@ public TimelineEntity getEntity(
parseFieldsStr(fields, ","),
getUser(req));
} catch (IllegalArgumentException e) {
- throw new BadRequestException(
- "requested invalid field.");
+ throw (BadRequestException)new BadRequestException(
+ "requested invalid field.").initCause(e);
} catch (Exception e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
@@ -201,8 +201,9 @@ public TimelineEvents getEvents(
parseLongStr(limit),
getUser(req));
} catch (NumberFormatException e) {
- throw new BadRequestException(
- "windowStart, windowEnd or limit is not a numeric value.");
+ throw (BadRequestException)new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.")
+ .initCause(e);
} catch (Exception e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index b1fbd13..bb4249d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -26,7 +26,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -55,6 +56,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -76,7 +78,7 @@
* Plugin timeline storage to support timeline server v1.5 API. This storage
* uses a file system to store timeline entities in their groups.
*/
-public class EntityGroupFSTimelineStore extends AbstractService
+public class EntityGroupFSTimelineStore extends CompositeService
implements TimelineStore {
static final String DOMAIN_LOG_PREFIX = "domainlog-";
@@ -128,7 +130,8 @@ public EntityGroupFSTimelineStore() {
@Override
protected void serviceInit(Configuration conf) throws Exception {
summaryStore = createSummaryStore();
- summaryStore.init(conf);
+ addService(summaryStore);
+
long logRetainSecs = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
YarnConfiguration
@@ -170,11 +173,27 @@ protected boolean removeEldestEntry(
});
cacheIdPlugins = loadPlugIns(conf);
// Initialize yarn client for application status
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
+ yarnClient = createAndInitYarnClient(conf);
+ // if non-null, hook its lifecycle up
+ addIfService(yarnClient);
+
super.serviceInit(conf);
}
+ /**
+ * Create and initialize the YARN Client. Tests may override/mock this.
+ * If they return null, then {@link #getAppState(ApplicationId)} will
+ * also need to be reworked.
+ * @param conf configuration
+ * @return the yarn client, or null.
+ *
+ */
+ protected YarnClient createAndInitYarnClient(Configuration conf) {
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ return client;
+ }
+
private List loadPlugIns(Configuration conf)
throws RuntimeException {
Collection pluginNames = conf.getStringCollection(
@@ -210,8 +229,9 @@ private TimelineStore createSummaryStore() {
@Override
protected void serviceStart() throws Exception {
+
+ super.serviceStart();
LOG.info("Starting {}", getName());
- yarnClient.start();
summaryStore.start();
Configuration conf = getConfig();
@@ -219,7 +239,10 @@ protected void serviceStart() throws Exception {
aclManager.setTimelineStore(summaryStore);
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
summaryTdm.init(conf);
- summaryTdm.start();
+ addService(summaryTdm);
+ // start child services that aren't already started
+ super.serviceStart();
+
activeRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
@@ -257,7 +280,8 @@ protected void serviceStart() throws Exception {
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
- LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
+ LOG.info("Scanning active directory {} every {} seconds", activeRootPath,
+ scanIntervalSecs);
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
executor = new ScheduledThreadPoolExecutor(numThreads,
@@ -267,7 +291,6 @@ protected void serviceStart() throws Exception {
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
cleanerIntervalSecs, TimeUnit.SECONDS);
- super.serviceStart();
}
@Override
@@ -286,18 +309,9 @@ protected void serviceStop() throws Exception {
}
}
}
- if (summaryTdm != null) {
- summaryTdm.stop();
- }
- if (summaryStore != null) {
- summaryStore.stop();
- }
- if (yarnClient != null) {
- yarnClient.stop();
- }
synchronized (cachedLogs) {
for (EntityCacheItem cacheItem : cachedLogs.values()) {
- cacheItem.getStore().close();
+ ServiceOperations.stopQuietly(cacheItem.getStore());
}
}
super.serviceStop();
@@ -305,17 +319,23 @@ protected void serviceStop() throws Exception {
@InterfaceAudience.Private
@VisibleForTesting
- void scanActiveLogs() throws IOException {
+ int scanActiveLogs() throws IOException {
RemoteIterator iter = fs.listStatusIterator(activeRootPath);
+ int scanned = 0;
while (iter.hasNext()) {
FileStatus stat = iter.next();
- ApplicationId appId = parseApplicationId(stat.getPath().getName());
+ String name = stat.getPath().getName();
+ ApplicationId appId = parseApplicationId(name);
if (appId != null) {
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
+ scanned++;
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
+ } else {
+ LOG.debug("Unable to parse entry {}", name);
}
}
+ return scanned;
}
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
@@ -456,7 +476,26 @@ private Path getDoneAppPath(ApplicationId appId) {
bucket1, bucket2, appId.toString()));
}
- // This method has to be synchronized to control traffic to RM
+ /**
+ * Get the application state
+ * @param appId application ID
+ * @return the state or {@link AppState#UNKNOWN} if it could not
+ * be determined
+ * @throws IOException
+ */
+ protected AppState getAppState(ApplicationId appId) throws IOException {
+ return getAppState(appId, yarnClient);
+ }
+
+ /**
+ * Ask the RM for the state of the application
+ * This method has to be synchronized to control traffic to RM
+ * @param appId application ID
+ * @param yarnClient
+ * @return the state or {@link AppState#UNKNOWN} if it could not
+ * be determined
+ * @throws IOException
+ */
private static synchronized AppState getAppState(ApplicationId appId,
YarnClient yarnClient) throws IOException {
AppState appState = AppState.ACTIVE;
@@ -476,7 +515,7 @@ private static synchronized AppState getAppState(ApplicationId appId,
@InterfaceAudience.Private
@VisibleForTesting
- enum AppState {
+ public enum AppState {
ACTIVE,
UNKNOWN,
COMPLETED
@@ -526,7 +565,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm)
if (!isDone()) {
LOG.debug("Try to parse summary log for log {} in {}",
appId, appDirPath);
- appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
+ appState = getAppState(appId);
long recentLogModTime = scanForLogs();
if (appState == AppState.UNKNOWN) {
if (Time.now() - recentLogModTime > unknownActiveMillis) {
@@ -659,14 +698,34 @@ public synchronized void moveToDone() throws IOException {
}
}
+ /**
+ * Extract any nested throwable forwarded from IPC operations
+ * @param e exception
+ * @return either the exception passed an an argument, or any nested
+ * exception which was wrapped inside an {@link UndeclaredThrowableException}
+ */
+ private Throwable extract(Exception e) {
+ Throwable t = e;
+ if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
+ t = e.getCause();
+ }
+ return t;
+ }
+
private class EntityLogScanner implements Runnable {
@Override
public void run() {
LOG.debug("Active scan starting");
try {
- scanActiveLogs();
+ int scanned = scanActiveLogs();
+ LOG.debug("Scanned {} active applications", scanned);
} catch (Exception e) {
- LOG.error("Error scanning active files", e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("File scanner interrupted");
+ } else {
+ LOG.error("Error scanning active files", t);
+ }
}
LOG.debug("Active scan complete");
}
@@ -690,7 +749,12 @@ public void run() {
}
LOG.debug("End parsing summary logs. ");
} catch (Exception e) {
- LOG.error("Error processing logs for " + appLogs.getAppId(), e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("Log parser interrupted");
+ } else {
+ LOG.error("Error processing logs for " + appLogs.getAppId(), t);
+ }
}
}
}
@@ -702,7 +766,12 @@ public void run() {
try {
cleanLogs(doneRootPath, fs, logRetainMillis);
} catch (Exception e) {
- LOG.error("Error cleaning files", e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("Cleaner interrupted");
+ } else {
+ LOG.error("Error cleaning files", e);
+ }
}
LOG.debug("Cleaner finished");
}