diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index 1c295e1..6da395c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -253,9 +253,12 @@ public void putDomain(ApplicationAttemptId appAttemptId, } @Override - public void close() throws Exception { - if (this.logFDsCache != null) { - this.logFDsCache.close(); + public synchronized void close() throws Exception { + if (logFDsCache != null) { + LOG.debug("Closing cache"); + logFDsCache.flush(); + logFDsCache.close(); + logFDsCache = null; } } @@ -323,6 +326,9 @@ public void writeEntities(List entities) if (writerClosed()) { prepareForWrite(); } + if (LOG.isDebugEnabled()) { + LOG.debug("Writing entity list of size " + entities.size()); + } for (TimelineEntity entity : entities) { getObjectMapper().writeValue(getJsonGenerator(), entity); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java index c616e63..04d9b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -104,19 +105,27 @@ public ClientResponse run() throws Exception { } }); } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } } catch (InterruptedException ie) { - throw new IOException(ie); + throw (IOException)new InterruptedIOException().initCause(ie); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { String msg = "Failed to get the response from the timeline server."; LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); + if (resp != null) { + msg += " HTTP error code: " + resp.getStatus(); + if (LOG.isDebugEnabled()) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } } throw new YarnException(msg); } @@ -128,10 +137,16 @@ public ClientResponse run() throws Exception { public ClientResponse doPostingObject(Object object, String path) { WebResource webResource = client.resource(resURI); if (path == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("POST to " + resURI); + } return webResource.accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, object); } else if (path.equals("domain")) { + if (LOG.isDebugEnabled()) { + LOG.debug("PUT to " + resURI +"/" + path); + } return webResource.path(path).accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .put(ClientResponse.class, object); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index eb47ef2..34aaf1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -129,9 +129,9 @@ public TimelineEntities getEntities( getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e); } catch (IllegalArgumentException e) { - throw new BadRequestException("requested invalid field."); + throw new BadRequestException("requested invalid field: " + e); } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, @@ -160,8 +160,8 @@ public TimelineEntity getEntity( parseFieldsStr(fields, ","), getUser(req)); } catch (IllegalArgumentException e) { - throw new BadRequestException( - "requested invalid field."); + throw (BadRequestException)new BadRequestException( + "requested invalid field.").initCause(e); } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, @@ -201,8 +201,9 @@ public TimelineEvents getEvents( parseLongStr(limit), getUser(req)); } catch (NumberFormatException e) { - throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + throw (BadRequestException)new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value.") + .initCause(e); } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index b1fbd13..aa24b16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -76,7 +78,7 @@ * Plugin timeline storage to support timeline server v1.5 API. This storage * uses a file system to store timeline entities in their groups. */ -public class EntityGroupFSTimelineStore extends AbstractService +public class EntityGroupFSTimelineStore extends CompositeService implements TimelineStore { static final String DOMAIN_LOG_PREFIX = "domainlog-"; @@ -128,7 +130,8 @@ public EntityGroupFSTimelineStore() { @Override protected void serviceInit(Configuration conf) throws Exception { summaryStore = createSummaryStore(); - summaryStore.init(conf); + addService(summaryStore); + long logRetainSecs = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS, YarnConfiguration @@ -170,17 +173,20 @@ protected boolean removeEldestEntry( }); cacheIdPlugins = loadPlugIns(conf); // Initialize yarn client for application status - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); + yarnClient = createAndInitYarnClient(conf); + // if non-null, hook its lifecycle up + addIfService(yarnClient); + super.serviceInit(conf); } private List loadPlugIns(Configuration conf) throws RuntimeException { - Collection pluginNames = conf.getStringCollection( + Collection pluginNames = conf.getTrimmedStringCollection( YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES); List pluginList = new LinkedList(); + Exception caught = null; for (final String name : pluginNames) { LOG.debug("Trying to load plugin class {}", name); TimelineEntityGroupPlugin cacheIdPlugin = null; @@ -191,10 +197,11 @@ protected boolean removeEldestEntry( clazz, conf); } catch (Exception e) { LOG.warn("Error loading plugin " + name, e); + caught = e; } if (cacheIdPlugin == null) { - throw new RuntimeException("No class defined for " + name); + throw new RuntimeException("No class defined for " + name, caught); } LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName()); pluginList.add(cacheIdPlugin); @@ -210,8 +217,9 @@ private TimelineStore createSummaryStore() { @Override protected void serviceStart() throws Exception { + + super.serviceStart(); LOG.info("Starting {}", getName()); - yarnClient.start(); summaryStore.start(); Configuration conf = getConfig(); @@ -219,7 +227,10 @@ protected void serviceStart() throws Exception { aclManager.setTimelineStore(summaryStore); summaryTdm = new TimelineDataManager(summaryStore, aclManager); summaryTdm.init(conf); - summaryTdm.start(); + addService(summaryTdm); + // start child services that aren't already started + super.serviceStart(); + activeRootPath = new Path(conf.get( YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, YarnConfiguration @@ -257,7 +268,8 @@ protected void serviceStart() throws Exception { YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS, YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT); - LOG.info("Scanning active directory every {} seconds", scanIntervalSecs); + LOG.info("Scanning active directory {} every {} seconds", activeRootPath, + scanIntervalSecs); LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs); executor = new ScheduledThreadPoolExecutor(numThreads, @@ -267,7 +279,6 @@ protected void serviceStart() throws Exception { TimeUnit.SECONDS); executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs, cleanerIntervalSecs, TimeUnit.SECONDS); - super.serviceStart(); } @Override @@ -286,18 +297,9 @@ protected void serviceStop() throws Exception { } } } - if (summaryTdm != null) { - summaryTdm.stop(); - } - if (summaryStore != null) { - summaryStore.stop(); - } - if (yarnClient != null) { - yarnClient.stop(); - } synchronized (cachedLogs) { for (EntityCacheItem cacheItem : cachedLogs.values()) { - cacheItem.getStore().close(); + ServiceOperations.stopQuietly(cacheItem.getStore()); } } super.serviceStop(); @@ -305,17 +307,23 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting - void scanActiveLogs() throws IOException { + int scanActiveLogs() throws IOException { RemoteIterator iter = fs.listStatusIterator(activeRootPath); + int scanned = 0; while (iter.hasNext()) { FileStatus stat = iter.next(); - ApplicationId appId = parseApplicationId(stat.getPath().getName()); + String name = stat.getPath().getName(); + ApplicationId appId = parseApplicationId(name); if (appId != null) { LOG.debug("scan logs for {} in {}", appId, stat.getPath()); + scanned++; AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); executor.execute(new ActiveLogParser(logs)); + } else { + LOG.debug("Unable to parse entry {}", name); } } + return scanned; } private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, @@ -456,7 +464,42 @@ private Path getDoneAppPath(ApplicationId appId) { bucket1, bucket2, appId.toString())); } - // This method has to be synchronized to control traffic to RM + /** + * Create and initialize the YARN Client. Tests may override/mock this. + * If they return null, then {@link #getAppState(ApplicationId)} MUST + * also be overridden + * @param conf configuration + * @return the yarn client, or null. + * + */ + @VisibleForTesting + protected YarnClient createAndInitYarnClient(Configuration conf) { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + return client; + } + + /** + * Get the application state. + * @param appId application ID + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException on IO problems + */ + @VisibleForTesting + protected AppState getAppState(ApplicationId appId) throws IOException { + return getAppState(appId, yarnClient); + } + + /** + * Ask the RM for the state of the application. + * This method has to be synchronized to control traffic to RM + * @param appId application ID + * @param yarnClient + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException + */ private static synchronized AppState getAppState(ApplicationId appId, YarnClient yarnClient) throws IOException { AppState appState = AppState.ACTIVE; @@ -476,7 +519,7 @@ private static synchronized AppState getAppState(ApplicationId appId, @InterfaceAudience.Private @VisibleForTesting - enum AppState { + public enum AppState { ACTIVE, UNKNOWN, COMPLETED @@ -526,7 +569,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); - appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + appState = getAppState(appId); long recentLogModTime = scanForLogs(); if (appState == AppState.UNKNOWN) { if (Time.now() - recentLogModTime > unknownActiveMillis) { @@ -659,14 +702,34 @@ public synchronized void moveToDone() throws IOException { } } + /** + * Extract any nested throwable forwarded from IPC operations. + * @param e exception + * @return either the exception passed an an argument, or any nested + * exception which was wrapped inside an {@link UndeclaredThrowableException} + */ + private Throwable extract(Exception e) { + Throwable t = e; + if (e instanceof UndeclaredThrowableException && e.getCause() != null) { + t = e.getCause(); + } + return t; + } + private class EntityLogScanner implements Runnable { @Override public void run() { LOG.debug("Active scan starting"); try { - scanActiveLogs(); + int scanned = scanActiveLogs(); + LOG.debug("Scanned {} active applications", scanned); } catch (Exception e) { - LOG.error("Error scanning active files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("File scanner interrupted"); + } else { + LOG.error("Error scanning active files", t); + } } LOG.debug("Active scan complete"); } @@ -690,7 +753,12 @@ public void run() { } LOG.debug("End parsing summary logs. "); } catch (Exception e) { - LOG.error("Error processing logs for " + appLogs.getAppId(), e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Log parser interrupted"); + } else { + LOG.error("Error processing logs for " + appLogs.getAppId(), t); + } } } } @@ -702,7 +770,12 @@ public void run() { try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { - LOG.error("Error cleaning files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Cleaner interrupted"); + } else { + LOG.error("Error cleaning files", e); + } } LOG.debug("Cleaner finished"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java index 4caed8d..131358f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; @@ -103,7 +104,8 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, attemptDirName); Path logPath = getPath(appDirPath); - if (fs.exists(logPath)) { + FileStatus status = fs.getFileStatus(logPath); + if (status != null && status.getLen() > 0) { long startTime = Time.monotonicNow(); try { LOG.debug("Parsing {} at offset {}", logPath, offset); @@ -118,7 +120,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, } } } else { - LOG.warn("{} no longer exists. Skip for scanning. ", logPath); + LOG.warn("{} no longer exists or is empty. Skip for scanning.", logPath); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestOverrideTimelineStoreYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestOverrideTimelineStoreYarnClient.java new file mode 100644 index 0000000..c190266 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestOverrideTimelineStoreYarnClient.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TestOverrideTimelineStoreYarnClient { + + @Test + public void testLifecycleAndOverride() throws Throwable { + YarnConfiguration conf = new YarnConfiguration(); + try(NoRMStore store = new NoRMStore()) { + store.init(conf); + store.start(); + Assert.assertEquals(EntityGroupFSTimelineStore.AppState.ACTIVE, + store.getAppState(ApplicationId.newInstance(1, 1))); + store.stop(); + } + } + + private static class NoRMStore extends EntityGroupFSTimelineStore { + @Override + protected YarnClient createAndInitYarnClient(Configuration conf) { + return null; + } + + @Override + protected AppState getAppState(ApplicationId appId) + throws IOException { + return AppState.ACTIVE; + } + } +}