diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java new file mode 100644 index 0000000..dbd471e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java @@ -0,0 +1,122 @@ +package org.apache.hadoop.yarn.api.records; + +import java.util.Iterator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.base.Splitter; + + +@Public +@Unstable +public abstract class CacheId implements Comparable{ + + private static final Splitter _SPLITTER = Splitter.on('_').trimResults(); + + @Private + @Unstable + public static final String cacheIdStrPrefix = "cache"; + + @Private + @Unstable + public static CacheId newInstance(ApplicationAttemptId attemptId, + long id) { + CacheId cacheId = Records.newRecord(CacheId.class); + cacheId.setApplicationAttemptId(attemptId); + cacheId.setCacheId(id); + cacheId.build(); + return cacheId; + } + + /** + * Get the ApplicationAttemptId of the CacheId. + * @return ApplicationAttemptId of the CacheId + */ + @Public + @Stable + public abstract ApplicationAttemptId getApplicationAttemptId(); + + @Private + @Unstable + protected abstract void setApplicationAttemptId(ApplicationAttemptId appID); + + /** + * Get the cache id + * @return cache id + */ + @Public + @Stable + public abstract long getCacheId(); + + @Private + @Unstable + protected abstract void setCacheId(long cacheId); + + @Override + public int hashCode() { + int result = (int) (getCacheId() ^ (getCacheId() >>> 32)); + result = 31 * result + getApplicationAttemptId().hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheId other = (CacheId) obj; + if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId())) + return false; + if (this.getCacheId() != other.getCacheId()) + return false; + return true; + } + + @Override + public int compareTo(CacheId other) { + int compareAppAttemptIds = this.getApplicationAttemptId().compareTo( + other.getApplicationAttemptId()); + if (compareAppAttemptIds == 0) { + return Long.valueOf(this.getCacheId()).compareTo( + Long.valueOf(other.getCacheId())); + } else { + return compareAppAttemptIds; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(cacheIdStrPrefix + "_"); + ApplicationId appId = getApplicationAttemptId().getApplicationId(); + sb.append(appId.getClusterTimestamp()).append("_"); + sb.append(appId.getId()).append("_"); + sb.append(getApplicationAttemptId().getAttemptId()).append("_"); + sb.append(getCacheId()); + return sb.toString(); + } + + public static CacheId fromString(String cacheIdStr) { + Iterator it = _SPLITTER.split(cacheIdStr).iterator(); + if (!it.next().equals(cacheIdStrPrefix)) { + throw new IllegalArgumentException("Invalid CacheId prefix: " + + cacheIdStr); + } + ApplicationId appId = + ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next())); + return CacheId.newInstance(appAttemptId, Long.parseLong(it.next())); + } + + protected abstract void build(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 93b81a6..89e2e6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1468,6 +1468,21 @@ private static void addDeprecatedKeys() { YARN_PREFIX + "timeline-service."; + /** + * The setting for timeline service plugin + */ + public static final String TIMELINE_SERVICE_PLUGIN_ENABLED = + TIMELINE_SERVICE_PREFIX + "plugin.enabled"; + public static final boolean DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED = false; + + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR = + TIMELINE_SERVICE_PREFIX + "entity-file-store.active-dir"; + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT = + "/tmp/entity-file-history/active"; + + public static final String TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES = + TIMELINE_SERVICE_PREFIX + "entity-file-store.summary-entity-types"; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 687ee89..776b7a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -53,6 +53,11 @@ message ContainerIdProto { optional int64 id = 3; } +message CacheIdProto { + optional ApplicationAttemptIdProto app_attempt_id = 1; + optional int64 id = 2; +} + message ResourceProto { optional int32 memory = 1; optional int32 virtual_cores = 2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java new file mode 100644 index 0000000..e97bea2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java @@ -0,0 +1,74 @@ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.CacheId; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CacheIdProto; + +import com.google.common.base.Preconditions; + +@Private +@Unstable +public class CacheIdPBImpl extends CacheId{ + CacheIdProto proto = null; + CacheIdProto.Builder builder = null; + private ApplicationAttemptId applicationAttemptId = null; + + public CacheIdPBImpl() { + builder = CacheIdProto.newBuilder(); + } + + public CacheIdPBImpl(CacheIdProto proto) { + this.proto = proto; + this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId()); + } + + public CacheIdProto getProto() { + return proto; + } + + @Override + public long getCacheId() { + Preconditions.checkNotNull(proto); + return proto.getId(); + } + + @Override + protected void setCacheId(long id) { + Preconditions.checkNotNull(builder); + builder.setId((id)); + } + + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return this.applicationAttemptId; + } + + @Override + protected void setApplicationAttemptId(ApplicationAttemptId atId) { + if (atId != null) { + Preconditions.checkNotNull(builder); + builder.setAppAttemptId(convertToProtoFormat(atId)); + } + this.applicationAttemptId = atId; + } + + private ApplicationAttemptIdPBImpl convertFromProtoFormat( + ApplicationAttemptIdProto p) { + return new ApplicationAttemptIdPBImpl(p); + } + + private ApplicationAttemptIdProto convertToProtoFormat( + ApplicationAttemptId t) { + return ((ApplicationAttemptIdPBImpl)t).getProto(); + } + + @Override + protected void build() { + proto = builder.build(); + builder = null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index a3766f9..e30ce13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -26,6 +26,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; @@ -78,6 +79,9 @@ protected TimelineClient(String name) { public abstract TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException; + @Public + public abstract TimelinePutResponse putEntities(CacheId cacheId, + TimelineEntity... entities) throws IOException, YarnException; /** *

* Send the information of a domain to the timeline server. It is a @@ -96,6 +100,23 @@ public abstract void putDomain( /** *

+ * Send the information of a domain to the timeline server. It is a + * blocking API. The method will not return until it gets the response from + * the timeline server. + *

+ * + * @param domain + * an {@link TimelineDomain} object + * @param cacheId {@link CacheId} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putDomain(CacheId cacheId, + TimelineDomain domain) throws IOException, YarnException; + + /** + *

* Get a delegation token so as to be able to talk to the timeline server in a * secure way. *

diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 04c84ca..0ef8a6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -30,6 +30,9 @@ import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -45,6 +48,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -55,6 +62,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -66,7 +74,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig.Feature; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -111,6 +125,21 @@ private URI resURI; private UserGroupInformation authUgi; private String doAsUser; + private boolean timelineServerPluginEnabled; + private Path activePath = null; + private FileSystem fs = null; + private Set summaryEntityTypes; + private ObjectMapper objMapper = null; + private JsonFactory jsonFactory = null; + + // App log directory must be readable by group so server can access logs + // and writable by group so it can be deleted by server + private static final short APP_LOG_DIR_PERMISSIONS = 0770; + // Logs must be readable by group so server can access them + private static final short FILE_LOG_PERMISSIONS = 0640; + private static final String DOMAIN_LOG_PREFIX = "domainlog-"; + private static final String SUMMARY_LOG_PREFIX = "summarylog-"; + private static final String ENTITY_LOG_PREFIX = "entitylog-"; @Private @VisibleForTesting @@ -294,6 +323,26 @@ protected void serviceInit(Configuration conf) throws Exception { RESOURCE_URI_STR)); } LOG.info("Timeline service address: " + resURI); + + timelineServerPluginEnabled = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_PLUGIN_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED); + if (timelineServerPluginEnabled) { + fs = activePath.getFileSystem(conf); + if (!fs.exists(activePath)) { + throw new IOException(activePath + " does not exist"); + } + Collection filterStrings = conf.getStringCollection( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES); + if (filterStrings.isEmpty()) { + throw new IllegalArgumentException( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES + + " is not set"); + } + summaryEntityTypes = new HashSet(filterStrings); + objMapper = createObjectMapper(); + jsonFactory = new JsonFactory(); + } super.serviceInit(conf); } @@ -659,4 +708,116 @@ private static void printUsage() { public UserGroupInformation getUgi() { return authUgi; } + + @Override + public TimelinePutResponse putEntities(CacheId cacheId, + TimelineEntity... entities) throws IOException, YarnException { + if (!timelineServerPluginEnabled || cacheId == null) { + return putEntities(entities); + } + for (TimelineEntity entity : entities) { + Path cacheDir = createCacheDir(cacheId); + Path entityLogPath; + if (summaryEntityTypes.contains(entity.getEntityType())) { + entityLogPath = + new Path(cacheDir, SUMMARY_LOG_PREFIX + cacheId.toString()); + LOG.info("Writing summary log for " + cacheId.toString() + " to " + + entityLogPath); + } else { + entityLogPath = + new Path(cacheDir, ENTITY_LOG_PREFIX + cacheId.toString()); + LOG.info("Writing entity log for " + cacheId.toString() + " to " + + entityLogPath); + } + writeEntity(entity, entityLogPath); + } + return new TimelinePutResponse(); + } + + private void writeEntity(TimelineEntity entity, Path logPath) + throws IOException { + FSDataOutputStream out = null; + try { + out = createLogFile(logPath); + JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(out); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + objMapper.writeValue(jsonGenerator, entity); + jsonGenerator.close(); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + @Override + public void putDomain(CacheId cacheId, TimelineDomain domain) + throws IOException, YarnException { + if (!timelineServerPluginEnabled || cacheId == null) { + putDomain(domain); + } else { + writeDomain(cacheId, domain); + } + } + + private ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(Inclusion.NON_NULL); + mapper.configure(Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + private Path createCacheDir(CacheId cacheId) throws IOException { + Path appDir = + new Path(activePath, cacheId.getApplicationAttemptId() + .getApplicationId().toString()); + if (!fs.exists(appDir)) { + FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); + } + Path cacheDir = new Path(appDir, cacheId.toString()); + if (!fs.exists(cacheDir)) { + FileSystem + .mkdirs(fs, cacheDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); + } + return cacheDir; + } + + private void writeDomain(CacheId cacheId, + TimelineDomain domain) throws IOException { + Path domainLogPath = + new Path(createCacheDir(cacheId), DOMAIN_LOG_PREFIX + + cacheId.toString()); + LOG.info("Writing domains for " + cacheId.toString() + " to " + + domainLogPath); + FSDataOutputStream out = null; + try { + out = createLogFile(domainLogPath); + JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(out); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + objMapper.writeValue(jsonGenerator, domain); + jsonGenerator.close(); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + private FSDataOutputStream createLogFile(Path logPath) + throws IOException { + FSDataOutputStream stream; + if (!fs.exists(logPath)) { + stream = fs.create(logPath, false); + fs.setPermission(logPath, new FsPermission(FILE_LOG_PERMISSIONS)); + } else { + stream = fs.append(logPath); + } + return stream; + } + }