diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index 1c295e1..02df640 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -254,8 +254,9 @@ public void putDomain(ApplicationAttemptId appAttemptId, @Override public void close() throws Exception { - if (this.logFDsCache != null) { - this.logFDsCache.close(); + if (logFDsCache != null) { + logFDsCache.flush(); + logFDsCache.close(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java index c616e63..fee4995 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -104,19 +105,27 @@ public ClientResponse run() throws Exception { } }); } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } } catch (InterruptedException ie) { - throw new IOException(ie); + throw (IOException)new InterruptedIOException().initCause(ie); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { String msg = "Failed to get the response from the timeline server."; LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); + if (resp != null) { + msg += " HTTP error code: " + resp.getStatus(); + if (LOG.isDebugEnabled()) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } } throw new YarnException(msg); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1c7f078..557185d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2043,6 +2043,13 @@ + yarn.timeline-service.entity-group-fs-store.rm.integration.enabled + true + Should the ATS v1.5 entity group file system storage query + the YARN Resource Manager for the active state of the application? + + + yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size Read cache size for the leveldb cache storage in ATS v1.5 plugin storage. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index eb47ef2..264f6ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -129,9 +129,9 @@ public TimelineEntities getEntities( getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e); } catch (IllegalArgumentException e) { - throw new BadRequestException("requested invalid field."); + throw new BadRequestException("requested invalid field: " + e); } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, @@ -160,8 +160,8 @@ public TimelineEntity getEntity( parseFieldsStr(fields, ","), getUser(req)); } catch (IllegalArgumentException e) { - throw new BadRequestException( - "requested invalid field."); + throw (BadRequestException)new BadRequestException( + "requested invalid field.").initCause(e); } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, @@ -201,8 +201,9 @@ public TimelineEvents getEvents( parseLongStr(limit), getUser(req)); } catch (NumberFormatException e) { - throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + throw (BadRequestException)new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value.") + .initCause(e); } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index b1fbd13..bb4249d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -76,7 +78,7 @@ * Plugin timeline storage to support timeline server v1.5 API. This storage * uses a file system to store timeline entities in their groups. */ -public class EntityGroupFSTimelineStore extends AbstractService +public class EntityGroupFSTimelineStore extends CompositeService implements TimelineStore { static final String DOMAIN_LOG_PREFIX = "domainlog-"; @@ -128,7 +130,8 @@ public EntityGroupFSTimelineStore() { @Override protected void serviceInit(Configuration conf) throws Exception { summaryStore = createSummaryStore(); - summaryStore.init(conf); + addService(summaryStore); + long logRetainSecs = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS, YarnConfiguration @@ -170,11 +173,27 @@ protected boolean removeEldestEntry( }); cacheIdPlugins = loadPlugIns(conf); // Initialize yarn client for application status - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); + yarnClient = createAndInitYarnClient(conf); + // if non-null, hook its lifecycle up + addIfService(yarnClient); + super.serviceInit(conf); } + /** + * Create and initialize the YARN Client. Tests may override/mock this. + * If they return null, then {@link #getAppState(ApplicationId)} will + * also need to be reworked. + * @param conf configuration + * @return the yarn client, or null. + * + */ + protected YarnClient createAndInitYarnClient(Configuration conf) { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + return client; + } + private List loadPlugIns(Configuration conf) throws RuntimeException { Collection pluginNames = conf.getStringCollection( @@ -210,8 +229,9 @@ private TimelineStore createSummaryStore() { @Override protected void serviceStart() throws Exception { + + super.serviceStart(); LOG.info("Starting {}", getName()); - yarnClient.start(); summaryStore.start(); Configuration conf = getConfig(); @@ -219,7 +239,10 @@ protected void serviceStart() throws Exception { aclManager.setTimelineStore(summaryStore); summaryTdm = new TimelineDataManager(summaryStore, aclManager); summaryTdm.init(conf); - summaryTdm.start(); + addService(summaryTdm); + // start child services that aren't already started + super.serviceStart(); + activeRootPath = new Path(conf.get( YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, YarnConfiguration @@ -257,7 +280,8 @@ protected void serviceStart() throws Exception { YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS, YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT); - LOG.info("Scanning active directory every {} seconds", scanIntervalSecs); + LOG.info("Scanning active directory {} every {} seconds", activeRootPath, + scanIntervalSecs); LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs); executor = new ScheduledThreadPoolExecutor(numThreads, @@ -267,7 +291,6 @@ protected void serviceStart() throws Exception { TimeUnit.SECONDS); executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs, cleanerIntervalSecs, TimeUnit.SECONDS); - super.serviceStart(); } @Override @@ -286,18 +309,9 @@ protected void serviceStop() throws Exception { } } } - if (summaryTdm != null) { - summaryTdm.stop(); - } - if (summaryStore != null) { - summaryStore.stop(); - } - if (yarnClient != null) { - yarnClient.stop(); - } synchronized (cachedLogs) { for (EntityCacheItem cacheItem : cachedLogs.values()) { - cacheItem.getStore().close(); + ServiceOperations.stopQuietly(cacheItem.getStore()); } } super.serviceStop(); @@ -305,17 +319,23 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting - void scanActiveLogs() throws IOException { + int scanActiveLogs() throws IOException { RemoteIterator iter = fs.listStatusIterator(activeRootPath); + int scanned = 0; while (iter.hasNext()) { FileStatus stat = iter.next(); - ApplicationId appId = parseApplicationId(stat.getPath().getName()); + String name = stat.getPath().getName(); + ApplicationId appId = parseApplicationId(name); if (appId != null) { LOG.debug("scan logs for {} in {}", appId, stat.getPath()); + scanned++; AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); executor.execute(new ActiveLogParser(logs)); + } else { + LOG.debug("Unable to parse entry {}", name); } } + return scanned; } private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, @@ -456,7 +476,26 @@ private Path getDoneAppPath(ApplicationId appId) { bucket1, bucket2, appId.toString())); } - // This method has to be synchronized to control traffic to RM + /** + * Get the application state + * @param appId application ID + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException + */ + protected AppState getAppState(ApplicationId appId) throws IOException { + return getAppState(appId, yarnClient); + } + + /** + * Ask the RM for the state of the application + * This method has to be synchronized to control traffic to RM + * @param appId application ID + * @param yarnClient + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException + */ private static synchronized AppState getAppState(ApplicationId appId, YarnClient yarnClient) throws IOException { AppState appState = AppState.ACTIVE; @@ -476,7 +515,7 @@ private static synchronized AppState getAppState(ApplicationId appId, @InterfaceAudience.Private @VisibleForTesting - enum AppState { + public enum AppState { ACTIVE, UNKNOWN, COMPLETED @@ -526,7 +565,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); - appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + appState = getAppState(appId); long recentLogModTime = scanForLogs(); if (appState == AppState.UNKNOWN) { if (Time.now() - recentLogModTime > unknownActiveMillis) { @@ -659,14 +698,34 @@ public synchronized void moveToDone() throws IOException { } } + /** + * Extract any nested throwable forwarded from IPC operations + * @param e exception + * @return either the exception passed an an argument, or any nested + * exception which was wrapped inside an {@link UndeclaredThrowableException} + */ + private Throwable extract(Exception e) { + Throwable t = e; + if (e instanceof UndeclaredThrowableException && e.getCause() != null) { + t = e.getCause(); + } + return t; + } + private class EntityLogScanner implements Runnable { @Override public void run() { LOG.debug("Active scan starting"); try { - scanActiveLogs(); + int scanned = scanActiveLogs(); + LOG.debug("Scanned {} active applications", scanned); } catch (Exception e) { - LOG.error("Error scanning active files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("File scanner interrupted"); + } else { + LOG.error("Error scanning active files", t); + } } LOG.debug("Active scan complete"); } @@ -690,7 +749,12 @@ public void run() { } LOG.debug("End parsing summary logs. "); } catch (Exception e) { - LOG.error("Error processing logs for " + appLogs.getAppId(), e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Log parser interrupted"); + } else { + LOG.error("Error processing logs for " + appLogs.getAppId(), t); + } } } } @@ -702,7 +766,12 @@ public void run() { try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { - LOG.error("Error cleaning files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Cleaner interrupted"); + } else { + LOG.error("Error cleaning files", e); + } } LOG.debug("Cleaner finished"); }