diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7b47201..f074cb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1651,6 +1651,13 @@ private static void addDeprecatedKeys() { DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RM_INTEGRATION_ENABLED + = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "rm.integration.enabled"; + public static final boolean + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RM_INTEGRATION_ENABLED_DEFAULT = + true; + public static final String TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "leveldb-cache-read-cache-size"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index 1c295e1..02df640 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -254,8 +254,9 @@ public void putDomain(ApplicationAttemptId appAttemptId, @Override public void close() throws Exception { - if (this.logFDsCache != null) { - this.logFDsCache.close(); + if (logFDsCache != null) { + logFDsCache.flush(); + logFDsCache.close(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java index c616e63..fee4995 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -104,19 +105,27 @@ public ClientResponse run() throws Exception { } }); } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } } catch (InterruptedException ie) { - throw new IOException(ie); + throw (IOException)new InterruptedIOException().initCause(ie); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { String msg = "Failed to get the response from the timeline server."; LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); + if (resp != null) { + msg += " HTTP error code: " + resp.getStatus(); + if (LOG.isDebugEnabled()) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } } throw new YarnException(msg); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1c7f078..557185d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2043,6 +2043,13 @@ + yarn.timeline-service.entity-group-fs-store.rm.integration.enabled + true + Should the ATS v1.5 entity group file system storage query + the YARN Resource Manager for the active state of the application? + + + yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size Read cache size for the leveldb cache storage in ATS v1.5 plugin storage. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index b1fbd13..14eefa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; @@ -55,6 +57,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -170,8 +173,13 @@ protected boolean removeEldestEntry( }); cacheIdPlugins = loadPlugIns(conf); // Initialize yarn client for application status - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); + if (conf.getBoolean(TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RM_INTEGRATION_ENABLED, + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RM_INTEGRATION_ENABLED_DEFAULT)) { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + } else { + LOG.info("YARN RM will not be checked for the status of incomplete applications"); + } super.serviceInit(conf); } @@ -210,8 +218,12 @@ private TimelineStore createSummaryStore() { @Override protected void serviceStart() throws Exception { + + super.serviceStart(); LOG.info("Starting {}", getName()); - yarnClient.start(); + if (yarnClient != null) { + yarnClient.start(); + } summaryStore.start(); Configuration conf = getConfig(); @@ -257,7 +269,8 @@ protected void serviceStart() throws Exception { YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS, YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT); - LOG.info("Scanning active directory every {} seconds", scanIntervalSecs); + LOG.info("Scanning active directory {} every {} seconds", activeRootPath, + scanIntervalSecs); LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs); executor = new ScheduledThreadPoolExecutor(numThreads, @@ -286,15 +299,9 @@ protected void serviceStop() throws Exception { } } } - if (summaryTdm != null) { - summaryTdm.stop(); - } - if (summaryStore != null) { - summaryStore.stop(); - } - if (yarnClient != null) { - yarnClient.stop(); - } + ServiceOperations.stopQuietly(summaryTdm); + ServiceOperations.stopQuietly(summaryStore); + ServiceOperations.stopQuietly(yarnClient); synchronized (cachedLogs) { for (EntityCacheItem cacheItem : cachedLogs.values()) { cacheItem.getStore().close(); @@ -305,17 +312,23 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting - void scanActiveLogs() throws IOException { + int scanActiveLogs() throws IOException { RemoteIterator iter = fs.listStatusIterator(activeRootPath); + int scanned = 0; while (iter.hasNext()) { FileStatus stat = iter.next(); - ApplicationId appId = parseApplicationId(stat.getPath().getName()); + String name = stat.getPath().getName(); + ApplicationId appId = parseApplicationId(name); if (appId != null) { LOG.debug("scan logs for {} in {}", appId, stat.getPath()); + scanned++; AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); executor.execute(new ActiveLogParser(logs)); + } else { + LOG.debug("Unable to parse entry {}", name); } } + return scanned; } private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, @@ -526,7 +539,11 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); - appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + if (yarnClient != null) { + appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + } else { + appState = AppState.UNKNOWN; + } long recentLogModTime = scanForLogs(); if (appState == AppState.UNKNOWN) { if (Time.now() - recentLogModTime > unknownActiveMillis) { @@ -659,14 +676,34 @@ public synchronized void moveToDone() throws IOException { } } + /** + * Extract any nested throwable forwarded from IPC operations + * @param e exception + * @return either the exception passed an an argument, or any nested + * exception which was wrapped inside an {@link UndeclaredThrowableException} + */ + private Throwable extract(Exception e) { + Throwable t = e; + if (e instanceof UndeclaredThrowableException && e.getCause() != null) { + t = e.getCause(); + } + return t; + } + private class EntityLogScanner implements Runnable { @Override public void run() { LOG.debug("Active scan starting"); try { - scanActiveLogs(); + int scanned = scanActiveLogs(); + LOG.debug("Scanned {} active applications", scanned); } catch (Exception e) { - LOG.error("Error scanning active files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("File scanner interrupted"); + } else { + LOG.error("Error scanning active files", t); + } } LOG.debug("Active scan complete"); } @@ -690,7 +727,12 @@ public void run() { } LOG.debug("End parsing summary logs. "); } catch (Exception e) { - LOG.error("Error processing logs for " + appLogs.getAppId(), e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Log parser interrupted"); + } else { + LOG.error("Error processing logs for " + appLogs.getAppId(), t); + } } } } @@ -702,7 +744,12 @@ public void run() { try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { - LOG.error("Error cleaning files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Cleaner interrupted"); + } else { + LOG.error("Error cleaning files", e); + } } LOG.debug("Cleaner finished"); }