diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java new file mode 100644 index 0000000..4b9d18e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java @@ -0,0 +1,123 @@ +package org.apache.hadoop.yarn.api.records; + +import java.util.Iterator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.base.Splitter; + + +@Public +@Unstable +public abstract class CacheId implements Comparable{ + + private static final Splitter _SPLITTER = Splitter.on('_').trimResults(); + + @Private + @Unstable + public static final String cacheIdStrPrefix = "cache"; + + @Public + @Unstable + public static CacheId newInstance(ApplicationId applicationId, + String id) { + CacheId cacheId = Records.newRecord(CacheId.class); + cacheId.setApplicationId(applicationId); + cacheId.setCacheId(id); + cacheId.build(); + return cacheId; + } + + /** + * Get the ApplicationId of the CacheId. + * @return ApplicationId of the CacheId + */ + @Public + @Stable + public abstract ApplicationId getApplicationId(); + + @Private + @Unstable + protected abstract void setApplicationId(ApplicationId appID); + + /** + * Get the cache id + * @return cache id + */ + @Public + @Stable + public abstract String getCacheId(); + + @Private + @Unstable + protected abstract void setCacheId(String cacheId); + + @Override + public int hashCode() { + int result = getCacheId().hashCode(); + result = 31 * result + getApplicationId().hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheId other = (CacheId) obj; + if (!this.getApplicationId().equals(other.getApplicationId())) + return false; + if (!this.getCacheId().equals(other.getCacheId())) { + return false; + }; + return true; + } + + @Override + public int compareTo(CacheId other) { + int compareAppIds = this.getApplicationId().compareTo( + other.getApplicationId()); + if (compareAppIds == 0) { + return this.getCacheId().compareTo(other.getCacheId()); + } else { + return compareAppIds; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(cacheIdStrPrefix + "_"); + ApplicationId appId = getApplicationId(); + sb.append(appId.getClusterTimestamp()).append("_"); + sb.append(appId.getId()).append("_"); + sb.append(getCacheId()); + return sb.toString(); + } + + public static CacheId fromString(String cacheIdStr) { + Iterator it = _SPLITTER.split(cacheIdStr).iterator(); + if (!it.next().equals(cacheIdStrPrefix)) { + throw new IllegalArgumentException("Invalid CacheId prefix: " + + cacheIdStr); + } + ApplicationId appId = + ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + String id = it.next(); + while (it.hasNext()) { + id += "_" + it.next(); + } + return CacheId.newInstance(appId, id); + } + + protected abstract void build(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 14c1ffc..f8c74f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1556,6 +1556,39 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX = TIMELINE_SERVICE_PREFIX + "ui-on-disk-path."; + /** + * The setting for timeline service plugin + */ + public static final String TIMELINE_SERVICE_PLUGIN_ENABLED = + TIMELINE_SERVICE_PREFIX + "plugin.enabled"; + public static final boolean DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED = false; + + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR = + TIMELINE_SERVICE_PREFIX + "entity-file-store.active-dir"; + + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT = + "/tmp/entity-file-history/active"; + + public static final String TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES = + TIMELINE_SERVICE_PREFIX + "entity-file-store.summary-entity-types"; + + public static final String + TIMELINE_SERVICE_ENTITYFILE_FD_FLUSH_INTERVAL_SECS = + TIMELINE_SERVICE_PREFIX + "entity-file-fd.flush-interval-secs"; + public static final long + TIMELINE_SERVICE_ENTITYFILE_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10; + + public static final String + TIMELINE_SERVICE_ENTITYFILE_FD_CLEAN_INTERVAL_SECS = + TIMELINE_SERVICE_PREFIX + "entity-file-fd.clean-interval-secs"; + public static final long + TIMELINE_SERVICE_ENTITYFILE_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60; + + public static final String TIMELINE_SERVICE_ENTITYFILE_FD_RETAIN_SECS = + TIMELINE_SERVICE_PREFIX + "entity-file-fd.retain-secs"; + public static final long TIMELINE_SERVICE_ENTITYFILE_FD_RETAIN_SECS_DEFAULT = + 5*60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 09d2bd5..74d76b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -53,6 +53,11 @@ message ContainerIdProto { optional int64 id = 3; } +message CacheIdProto { + optional ApplicationIdProto app_id = 1; + optional string id = 2; +} + message ResourceProto { optional int32 memory = 1; optional int32 virtual_cores = 2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java new file mode 100644 index 0000000..904e741 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java @@ -0,0 +1,73 @@ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CacheId; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CacheIdProto; + +import com.google.common.base.Preconditions; + +@Private +@Unstable +public class CacheIdPBImpl extends CacheId{ + CacheIdProto proto = null; + CacheIdProto.Builder builder = null; + private ApplicationId applicationId = null; + + public CacheIdPBImpl() { + builder = CacheIdProto.newBuilder(); + } + + public CacheIdPBImpl(CacheIdProto proto) { + this.proto = proto; + this.applicationId = convertFromProtoFormat(proto.getAppId()); + } + + public CacheIdProto getProto() { + return proto; + } + + @Override + public String getCacheId() { + Preconditions.checkNotNull(proto); + return proto.getId(); + } + + @Override + protected void setCacheId(String id) { + Preconditions.checkNotNull(builder); + builder.setId((id)); + } + + @Override + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + protected void setApplicationId(ApplicationId appId) { + if (appId != null) { + Preconditions.checkNotNull(builder); + builder.setAppId(convertToProtoFormat(appId)); + } + this.applicationId = appId; + } + + private ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat( + ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } + + @Override + protected void build() { + proto = builder.build(); + builder = null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index a3766f9..1e13625 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; @@ -80,6 +82,26 @@ public abstract TimelinePutResponse putEntities( /** *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a blocking API. The method will not return until it gets the + * response from the timeline server. + *

+ * + * @param entities + * the collection of {@link TimelineEntity} + * @param cacheId {@link CacheId} + * @param appAttemptId {@link ApplicationAttemptId} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract TimelinePutResponse putEntities(CacheId cacheId, + ApplicationAttemptId appAttemptId, TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

* Send the information of a domain to the timeline server. It is a * blocking API. The method will not return until it gets the response from * the timeline server. @@ -96,6 +118,23 @@ public abstract void putDomain( /** *

+ * Send the information of a domain to the timeline server. It is a + * blocking API. The method will not return until it gets the response from + * the timeline server. + *

+ * + * @param domain + * an {@link TimelineDomain} object + * @param appAttemptId {@link ApplicationAttemptId} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException; + + /** + *

* Get a delegation token so as to be able to talk to the timeline server in a * secure way. *

diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 04c84ca..b9ffcbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.client.api.impl; +import java.io.Closeable; import java.io.File; +import java.io.Flushable; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.ConnectException; @@ -29,7 +31,17 @@ import java.net.URLConnection; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -45,6 +57,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -55,6 +72,9 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -66,7 +86,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig.Feature; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -111,6 +137,32 @@ private URI resURI; private UserGroupInformation authUgi; private String doAsUser; + private boolean timelineServerPluginEnabled; + private Path activePath = null; + private FileSystem fs = null; + private Set summaryEntityTypes; + private ObjectMapper objMapper = null; + private long flushIntervalSecs; + private long cleanIntervalSecs; + private long ttl; + private LogFDsCache logFDsCache = null; + private boolean isAppendSupported; + private Configuration conf; + + // This is temporary solution. The configuration will be deleted once we have + // the FileSystem API to check whether append operation is supported or not. + private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + + "entity-file.fs-support-append"; + + // App log directory must be readable by group so server can access logs + // and writable by group so it can be deleted by server + private static final short APP_LOG_DIR_PERMISSIONS = 0770; + // Logs must be readable by group so server can access them + private static final short FILE_LOG_PERMISSIONS = 0640; + private static final String DOMAIN_LOG_PREFIX = "domainlog-"; + private static final String SUMMARY_LOG_PREFIX = "summarylog-"; + private static final String ENTITY_LOG_PREFIX = "entitylog-"; @Private @VisibleForTesting @@ -255,6 +307,7 @@ public TimelineClientImpl() { } protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { @@ -294,8 +347,64 @@ protected void serviceInit(Configuration conf) throws Exception { RESOURCE_URI_STR)); } LOG.info("Timeline service address: " + resURI); + + timelineServerPluginEnabled = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_PLUGIN_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED); super.serviceInit(conf); } + @Override + protected void serviceStart() throws Exception { + if (timelineServerPluginEnabled) { + activePath = + new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR, + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT)); + fs = activePath.getFileSystem(conf); + if (!fs.exists(activePath)) { + throw new IOException(activePath + " does not exist"); + } + summaryEntityTypes = new HashSet(conf.getStringCollection( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES)); + objMapper = createObjectMapper(); + flushIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYFILE_FD_FLUSH_INTERVAL_SECS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYFILE_FD_FLUSH_INTERVAL_SECS_DEFAULT); + cleanIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYFILE_FD_CLEAN_INTERVAL_SECS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYFILE_FD_CLEAN_INTERVAL_SECS_DEFAULT); + ttl = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FD_RETAIN_SECS, + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FD_RETAIN_SECS_DEFAULT); + logFDsCache = + new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl); + this.isAppendSupported = + conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); + if (LOG.isDebugEnabled()) { + LOG.debug( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FD_FLUSH_INTERVAL_SECS + + "=" + flushIntervalSecs + ", " + + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FD_CLEAN_INTERVAL_SECS + + "=" + cleanIntervalSecs + ", " + + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FD_RETAIN_SECS + + "=" + ttl + ", " + + TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + + "=" + isAppendSupported); + } + } + } + + @Override + protected void serviceStop() throws Exception { + if (this.logFDsCache != null) { + this.logFDsCache.close(); + } + super.serviceStop(); + } @Override public TimelinePutResponse putEntities( @@ -659,4 +768,439 @@ private static void printUsage() { public UserGroupInformation getUgi() { return authUgi; } + + @Override + public TimelinePutResponse putEntities(CacheId cacheId, + ApplicationAttemptId appAttemptId, TimelineEntity... entities) + throws IOException, YarnException { + if (!timelineServerPluginEnabled || appAttemptId == null) { + return putEntities(entities); + } + + List entitiesToDB = new ArrayList(); + List entitiesToSummary = new ArrayList(); + List entitiesToEntity = new ArrayList(); + Path attemptDir = createAttemptDir(appAttemptId); + + for (TimelineEntity entity : entities) { + if (summaryEntityTypes.contains(entity.getEntityType())) { + entitiesToSummary.add(entity); + } else { + if (cacheId != null) { + entitiesToEntity.add(entity); + } else { + entitiesToDB.add(entity); + } + } + } + + if (!entitiesToSummary.isEmpty()) { + Path summaryLogPath = + new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString()); + LOG.info("Writing summary log for " + appAttemptId.toString() + " to " + + summaryLogPath); + this.logFDsCache.writeEntityLogs(fs, summaryLogPath, objMapper, + appAttemptId, entitiesToSummary, isAppendSupported, true); + } + + if (!entitiesToEntity.isEmpty()) { + Path entityLogPath = + new Path(attemptDir, ENTITY_LOG_PREFIX + cacheId.toString()); + LOG.info("Writing entity log for " + cacheId.toString() + " to " + + entityLogPath); + this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper, + appAttemptId, entitiesToEntity, isAppendSupported, false); + } + + if (!entitiesToDB.isEmpty()) { + putEntities(entitiesToDB.toArray( + new TimelineEntity[entitiesToDB.size()])); + } + + return new TimelinePutResponse(); + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException { + if (!timelineServerPluginEnabled || appAttemptId == null) { + putDomain(domain); + } else { + writeDomain(appAttemptId, domain); + } + } + + private ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(Inclusion.NON_NULL); + mapper.configure(Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + private Path createAttemptDir(ApplicationAttemptId appAttemptId) + throws IOException { + Path appDir = createApplicationDir(appAttemptId.getApplicationId()); + + Path attemptDir = new Path(appDir, appAttemptId.toString()); + if (!fs.exists(attemptDir)) { + FileSystem.mkdirs(fs, attemptDir, new FsPermission( + APP_LOG_DIR_PERMISSIONS)); + } + return attemptDir; + } + + private Path createApplicationDir(ApplicationId appId) throws IOException { + Path appDir = + new Path(activePath, appId.toString()); + if (!fs.exists(appDir)) { + FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); + } + return appDir; + } + + private void writeDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException { + Path domainLogPath = + new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX + + appAttemptId.toString()); + LOG.info("Writing domains for " + appAttemptId.toString() + " to " + + domainLogPath); + this.logFDsCache.writeDomainLog( + fs, domainLogPath, objMapper, domain, isAppendSupported); + } + + private static class DomainLogFD extends LogFD { + public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + super(fs, logPath, objMapper, isAppendSupported); + } + + public synchronized void writeDomain(TimelineDomain domain) + throws IOException { + objMapper.writeValue(jsonGenerator, domain); + lastModifiedTime = System.currentTimeMillis(); + } + } + + private static class EntityLogFD extends LogFD { + public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + super(fs, logPath, objMapper, isAppendSupported); + } + + public synchronized void writeEntities(List entities) + throws IOException { + for (TimelineEntity entity : entities) { + objMapper.writeValue(jsonGenerator, entity); + } + lastModifiedTime = System.currentTimeMillis(); + } + } + + private static class LogFD { + private FSDataOutputStream stream; + protected ObjectMapper objMapper; + protected JsonGenerator jsonGenerator; + protected long lastModifiedTime; + private final boolean isAppendSupported; + private final ReentrantLock fdLock = new ReentrantLock(); + + public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + this.isAppendSupported = isAppendSupported; + this.objMapper = objMapper; + this.stream = createLogFileStream(fs, logPath); + this.jsonGenerator = new JsonFactory().createJsonGenerator(stream); + this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + this.lastModifiedTime = System.currentTimeMillis(); + } + + public void close() { + if (stream != null) { + IOUtils.cleanup(LOG, jsonGenerator); + IOUtils.cleanup(LOG, stream); + stream = null; + jsonGenerator = null; + } + } + + public void flush() throws IOException { + if (stream != null) { + stream.hflush(); + } + } + + public long getLastModifiedTime() { + return this.lastModifiedTime; + } + + private FSDataOutputStream createLogFileStream(FileSystem fs, Path logPath) + throws IOException { + FSDataOutputStream stream; + if (!isAppendSupported) { + logPath = + new Path(logPath.getParent(), + (logPath.getName() + "_" + System.currentTimeMillis())); + } + if (!fs.exists(logPath)) { + stream = fs.create(logPath, false); + fs.setPermission(logPath, new FsPermission(FILE_LOG_PERMISSIONS)); + } else { + stream = fs.append(logPath); + } + return stream; + } + + public void lock() { + this.fdLock.lock(); + } + + public void unlock() { + this.fdLock.unlock(); + } + } + + private static class LogFDsCache implements Closeable, Flushable{ + private DomainLogFD domainLogFD; + private Map summanyLogFDs; + private Map entityLogFDs; + private Timer flushTimer; + private FlushTimerTask flushTimerTask; + private Timer cleanInActiveFDsTimer; + private CleanInActiveFDsTask cleanInActiveFDsTask; + private final long ttl; + private final ReentrantLock domainFDLocker = new ReentrantLock(); + + public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, + long ttl) { + domainLogFD = null; + summanyLogFDs = + new ConcurrentHashMap(); + entityLogFDs = + new ConcurrentHashMap(); + this.flushTimer = + new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", + true); + this.flushTimerTask = new FlushTimerTask(); + this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000, + flushIntervalSecs * 1000); + + this.cleanInActiveFDsTimer = + new Timer(LogFDsCache.class.getSimpleName() + + "cleanInActiveFDsTimer", true); + this.cleanInActiveFDsTask = new CleanInActiveFDsTask(); + this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask, + cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); + this.ttl = ttl; + } + + @Override + public void flush() throws IOException { + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + domainLogFD.flush(); + } + } finally { + this.domainFDLocker.unlock(); + } + + flushFDMap(summanyLogFDs, true); + + flushFDMap(entityLogFDs, false); + } + + private void flushFDMap(Map logFDs, + boolean isSummaryLog) throws IOException { + if (!logFDs.isEmpty()) { + for (Entry logFDEntry : logFDs + .entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + if (logFD != null && logFDs.containsValue(logFD)) { + logFD.flush(); + } + } finally { + logFD.unlock(); + } + } + } + } + + private class FlushTimerTask extends TimerTask { + @Override + public void run() { + try { + flush(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + } + } + + private void cleanInActiveFDs() { + long currentTimeStamp = System.currentTimeMillis(); + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + if (domainLogFD.getLastModifiedTime() - currentTimeStamp >= ttl) { + domainLogFD.close(); + domainLogFD = null; + } + } + } finally { + this.domainFDLocker.unlock(); + } + + cleanInActiveFDsforMap(summanyLogFDs, currentTimeStamp, true); + + cleanInActiveFDsforMap(entityLogFDs, currentTimeStamp, false); + } + + private void cleanInActiveFDsforMap( + Map logFDs, + long currentTimeStamp, boolean isSummaryLog) { + if (!logFDs.isEmpty()) { + for (Entry logFDEntry : logFDs + .entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + if (logFD != null && logFDs.containsValue(logFD)) { + if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { + logFD.close(); + logFD = null; + logFDs.remove(logFDEntry.getKey()); + } + } + } finally { + logFD.unlock(); + } + } + } + } + + private class CleanInActiveFDsTask extends TimerTask { + @Override + public void run() { + try { + cleanInActiveFDs(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + } + } + + @Override + public void close() throws IOException { + flushTimer.cancel(); + cleanInActiveFDsTimer.cancel(); + + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + domainLogFD.close(); + domainLogFD = null; + } + } finally { + this.domainFDLocker.unlock(); + } + + closeFDs(summanyLogFDs, true); + + closeFDs(entityLogFDs, true); + } + + private void closeFDs(Map logFDs, + boolean isSummaryLog) { + if (!logFDs.isEmpty()) { + for (Entry logFDEntry + : logFDs.entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + if (logFD != null && logFDs.containsValue(logFD)) { + logFD.close(); + logFDs.remove(logFDEntry.getKey()); + } + } finally { + logFD.unlock(); + } + } + } + } + + public void writeDomainLog(FileSystem fs, Path logPath, + ObjectMapper objMapper, TimelineDomain domain, + boolean isAppendSupported) throws IOException { + try { + this.domainFDLocker.lock(); + if (this.domainLogFD != null) { + this.domainLogFD.writeDomain(domain); + } else { + this.domainLogFD = + new DomainLogFD(fs, logPath, objMapper, isAppendSupported); + this.domainLogFD.writeDomain(domain); + } + } finally { + this.domainFDLocker.unlock(); + } + } + + public void writeEntityLogs(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported, + boolean isSummaryLog) throws IOException { + + writeEntityLogs(fs, logPath, objMapper, attemptId, entities, + isAppendSupported, isSummaryLog ? this.summanyLogFDs + : this.entityLogFDs); + + } + + private void writeEntityLogs(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported, + Map logFDs) throws IOException { + EntityLogFD logFD = null; + logFD = logFDs.get(attemptId); + if (logFD != null) { + try { + logFD.lock(); + if (logFDs.containsValue(logFD)) { + logFD.writeEntities(entities); + } else { + createFDAndWrite(fs, logPath, objMapper, attemptId, entities, + isAppendSupported, logFDs); + } + } finally { + logFD.unlock(); + } + } else { + createFDAndWrite(fs, logPath, objMapper, attemptId, entities, + isAppendSupported, logFDs); + } + } + + private void createFDAndWrite(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported, + Map logFDs) throws IOException { + EntityLogFD logFD = + new EntityLogFD(fs, logPath, objMapper, isAppendSupported); + try { + logFD.lock(); + logFDs.put(attemptId, logFD); + logFD.writeEntities(entities); + } finally { + logFD.unlock(); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestCacheId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestCacheId.java new file mode 100644 index 0000000..78a1c6a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestCacheId.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CacheId; +import org.junit.Assert; +import org.junit.Test; + +public class TestCacheId { + + @Test + public void testCacheId() { + ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + ApplicationId appId2 = ApplicationId.newInstance(1234, 2); + CacheId cacheId1 = CacheId.newInstance(appId1, "1"); + CacheId cacheId2 = CacheId.newInstance(appId1, "2"); + CacheId cacheId3 = CacheId.newInstance(appId2, "1"); + CacheId cacheId4 = CacheId.newInstance(appId1, "1"); + + Assert.assertTrue(cacheId1.equals(cacheId4)); + Assert.assertFalse(cacheId1.equals(cacheId2)); + Assert.assertFalse(cacheId1.equals(cacheId3)); + + Assert.assertTrue(cacheId1.compareTo(cacheId4) == 0); + Assert.assertTrue(cacheId1.compareTo(cacheId2) < 0); + Assert.assertTrue(cacheId1.compareTo(cacheId3) < 0); + + Assert.assertTrue(cacheId1.hashCode() == cacheId4.hashCode()); + Assert.assertFalse(cacheId1.hashCode() == cacheId2.hashCode()); + Assert.assertFalse(cacheId1.hashCode() == cacheId3.hashCode()); + + Assert.assertEquals("cache_1234_1_1", cacheId1.toString()); + Assert.assertEquals(CacheId.fromString("cache_1234_1_1"), cacheId1); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java new file mode 100644 index 0000000..39c546c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java @@ -0,0 +1,201 @@ +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; + +import java.io.File; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CacheId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClientForATS1_5 { + + protected static Log LOG = LogFactory + .getLog(TestTimelineClientForATS1_5.class); + + private TimelineClientImpl client; + private static FileContext localFS; + private static File localActiveDir; + + @Before + public void setup() throws Exception { + localFS = FileContext.getLocalFSFileContext(); + localActiveDir = + new File("target", this.getClass().getSimpleName() + "-activeDir") + .getAbsoluteFile(); + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + localActiveDir.mkdir(); + LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath()); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_PLUGIN_ENABLED, true); + conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR, + localActiveDir.getAbsolutePath()); + conf.set( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES, + "summary_type"); + client = createTimelineClient(conf); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + client.stop(); + } + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + } + + @Test + public void testPostEntities() throws Exception { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + CacheId cacheId = CacheId.newInstance(appId, "1"); + // Create two entities, includes an entity type and a summary type + TimelineEntity[] entities = new TimelineEntity[2]; + entities[0] = generateEntity("entity_type"); + entities[1] = generateEntity("summary_type"); + try { + // if attemptid is null, fall back to the original putEntities call, and + // save the entity + // into configured levelDB store + mockEntityClientResponse(client, ClientResponse.Status.OK); + client.putEntities(null, null, entities); + verify(client, times(1)).putEntities(entities); + reset(client); + + // if the attemptId is specified, but cacheId is given as null, it would + // fall back to the original putEntities call if we have the entity type. + // the entity which is summary type would be written into FS + mockEntityClientResponse(client, ClientResponse.Status.OK); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId, 1); + client.putEntities(null, attemptId1, entities); + TimelineEntity[] entityTDB = new TimelineEntity[1]; + entityTDB[0] = entities[0]; + verify(client, times(1)).putEntities(entityTDB); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId1), "summarylog-" + + attemptId1.toString()))); + reset(client); + + // if we specified attemptId as well as cacheId, it would save the entity + // into + // FileSystem instead of levelDB store + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId, 2); + client.putEntities(cacheId, attemptId2, entities); + verify(client, times(0)).putEntities(any(TimelineEntity[].class)); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId2), "summarylog-" + + attemptId2.toString()))); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId2), "entitylog-" + + cacheId.toString()))); + reset(client); + } catch (Exception e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPutDomain() { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId, 1); + try { + TimelineDomain domain = generateDomain(); + + mockDomainClientResponse(client, ClientResponse.Status.OK); + client.putDomain(null, domain); + verify(client, times(1)).putDomain(domain); + reset(client); + + mockDomainClientResponse(client, ClientResponse.Status.OK); + client.putDomain(attemptId1, domain); + verify(client, times(0)).putDomain(domain); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId1), "domainlog-" + + attemptId1.toString()))); + reset(client); + } catch (Exception e) { + Assert.fail("Exception is not expected"); + } + } + + private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) { + Path appDir = + new Path(localActiveDir.getAbsolutePath(), appAttemptId + .getApplicationId().toString()); + Path attemptDir = new Path(appDir, appAttemptId.toString()); + return attemptDir; + } + + private static TimelineEntity generateEntity(String type) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("entity id"); + entity.setEntityType(type); + entity.setStartTime(System.currentTimeMillis()); + return entity; + } + + private static TimelineDomain generateDomain() { + TimelineDomain domain = new TimelineDomain(); + domain.setId("namesapce id"); + domain.setDescription("domain description"); + domain.setOwner("domain owner"); + domain.setReaders("domain_reader"); + domain.setWriters("domain_writer"); + domain.setCreatedTime(0L); + domain.setModifiedTime(1L); + return domain; + } + + private static ClientResponse mockEntityClientResponse( + TimelineClientImpl client, ClientResponse.Status status) { + ClientResponse response = mock(ClientResponse.class); + doReturn(response).when(client).doPostingObject( + any(TimelineEntities.class), any(String.class)); + when(response.getClientResponseStatus()).thenReturn(status); + return response; + } + + private static ClientResponse mockDomainClientResponse( + TimelineClientImpl client, ClientResponse.Status status) { + ClientResponse response = mock(ClientResponse.class); + doReturn(response).when(client).doPostingObject(any(TimelineDomain.class), + any(String.class)); + when(response.getClientResponseStatus()).thenReturn(status); + return response; + } + + private static TimelineClientImpl + createTimelineClient(YarnConfiguration conf) { + TimelineClientImpl client = + spy((TimelineClientImpl) TimelineClient.createTimelineClient()); + client.init(conf); + client.start(); + return client; + } +}