diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java new file mode 100644 index 0000000..972c98b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CacheId.java @@ -0,0 +1,123 @@ +package org.apache.hadoop.yarn.api.records; + +import java.util.Iterator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.base.Splitter; + + +@Public +@Unstable +public abstract class CacheId implements Comparable{ + + private static final Splitter _SPLITTER = Splitter.on('_').trimResults(); + + @Private + @Unstable + public static final String cacheIdStrPrefix = "cache"; + + @Private + @Unstable + public static CacheId newInstance(ApplicationId applicationId, + String id) { + CacheId cacheId = Records.newRecord(CacheId.class); + cacheId.setApplicationId(applicationId); + cacheId.setCacheId(id); + cacheId.build(); + return cacheId; + } + + /** + * Get the ApplicationId of the CacheId. + * @return ApplicationId of the CacheId + */ + @Public + @Stable + public abstract ApplicationId getApplicationId(); + + @Private + @Unstable + protected abstract void setApplicationId(ApplicationId appID); + + /** + * Get the cache id + * @return cache id + */ + @Public + @Stable + public abstract String getCacheId(); + + @Private + @Unstable + protected abstract void setCacheId(String cacheId); + + @Override + public int hashCode() { + int result = getCacheId().hashCode(); + result = 31 * result + getApplicationId().hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheId other = (CacheId) obj; + if (!this.getApplicationId().equals(other.getApplicationId())) + return false; + if (!this.getCacheId().equals(other.getCacheId())) { + return false; + }; + return true; + } + + @Override + public int compareTo(CacheId other) { + int compareAppIds = this.getApplicationId().compareTo( + other.getApplicationId()); + if (compareAppIds == 0) { + return this.getCacheId().compareTo(other.getCacheId()); + } else { + return compareAppIds; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(cacheIdStrPrefix + "_"); + ApplicationId appId = getApplicationId(); + sb.append(appId.getClusterTimestamp()).append("_"); + sb.append(appId.getId()).append("_"); + sb.append(getCacheId()); + return sb.toString(); + } + + public static CacheId fromString(String cacheIdStr) { + Iterator it = _SPLITTER.split(cacheIdStr).iterator(); + if (!it.next().equals(cacheIdStrPrefix)) { + throw new IllegalArgumentException("Invalid CacheId prefix: " + + cacheIdStr); + } + ApplicationId appId = + ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + String id = it.next(); + while (it.hasNext()) { + id += "_" + it.next(); + } + return CacheId.newInstance(appId, id); + } + + protected abstract void build(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5176477..fffb588 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1393,6 +1393,21 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX = TIMELINE_SERVICE_PREFIX + "ui-on-disk-path."; + /** + * The setting for timeline service plugin + */ + public static final String TIMELINE_SERVICE_PLUGIN_ENABLED = + TIMELINE_SERVICE_PREFIX + "plugin.enabled"; + public static final boolean DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED = false; + + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR = + TIMELINE_SERVICE_PREFIX + "entity-file-store.active-dir"; + public static final String TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT = + "/tmp/entity-file-history/active"; + + public static final String TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES = + TIMELINE_SERVICE_PREFIX + "entity-file-store.summary-entity-types"; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 1e32a84..64e24e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -53,6 +53,11 @@ message ContainerIdProto { optional int64 id = 3; } +message CacheIdProto { + optional ApplicationIdProto app_id = 1; + optional string id = 2; +} + message ResourceProto { optional int32 memory = 1; optional int32 virtual_cores = 2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java new file mode 100644 index 0000000..904e741 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CacheIdPBImpl.java @@ -0,0 +1,73 @@ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CacheId; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CacheIdProto; + +import com.google.common.base.Preconditions; + +@Private +@Unstable +public class CacheIdPBImpl extends CacheId{ + CacheIdProto proto = null; + CacheIdProto.Builder builder = null; + private ApplicationId applicationId = null; + + public CacheIdPBImpl() { + builder = CacheIdProto.newBuilder(); + } + + public CacheIdPBImpl(CacheIdProto proto) { + this.proto = proto; + this.applicationId = convertFromProtoFormat(proto.getAppId()); + } + + public CacheIdProto getProto() { + return proto; + } + + @Override + public String getCacheId() { + Preconditions.checkNotNull(proto); + return proto.getId(); + } + + @Override + protected void setCacheId(String id) { + Preconditions.checkNotNull(builder); + builder.setId((id)); + } + + @Override + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + protected void setApplicationId(ApplicationId appId) { + if (appId != null) { + Preconditions.checkNotNull(builder); + builder.setAppId(convertToProtoFormat(appId)); + } + this.applicationId = appId; + } + + private ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat( + ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } + + @Override + protected void build() { + proto = builder.build(); + builder = null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index a3766f9..1e13625 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; @@ -80,6 +82,26 @@ public abstract TimelinePutResponse putEntities( /** *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a blocking API. The method will not return until it gets the + * response from the timeline server. + *

+ * + * @param entities + * the collection of {@link TimelineEntity} + * @param cacheId {@link CacheId} + * @param appAttemptId {@link ApplicationAttemptId} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract TimelinePutResponse putEntities(CacheId cacheId, + ApplicationAttemptId appAttemptId, TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

* Send the information of a domain to the timeline server. It is a * blocking API. The method will not return until it gets the response from * the timeline server. @@ -96,6 +118,23 @@ public abstract void putDomain( /** *

+ * Send the information of a domain to the timeline server. It is a + * blocking API. The method will not return until it gets the response from + * the timeline server. + *

+ * + * @param domain + * an {@link TimelineDomain} object + * @param appAttemptId {@link ApplicationAttemptId} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException; + + /** + *

* Get a delegation token so as to be able to talk to the timeline server in a * secure way. *

diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 04c84ca..c832146 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -29,7 +29,12 @@ import java.net.URLConnection; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -45,6 +50,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -55,6 +64,8 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.CacheId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -66,7 +77,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig.Feature; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -111,6 +128,21 @@ private URI resURI; private UserGroupInformation authUgi; private String doAsUser; + private boolean timelineServerPluginEnabled; + private Path activePath = null; + private FileSystem fs = null; + private Set summaryEntityTypes; + private ObjectMapper objMapper = null; + private JsonFactory jsonFactory = null; + + // App log directory must be readable by group so server can access logs + // and writable by group so it can be deleted by server + private static final short APP_LOG_DIR_PERMISSIONS = 0770; + // Logs must be readable by group so server can access them + private static final short FILE_LOG_PERMISSIONS = 0640; + private static final String DOMAIN_LOG_PREFIX = "domainlog-"; + private static final String SUMMARY_LOG_PREFIX = "summarylog-"; + private static final String ENTITY_LOG_PREFIX = "entitylog-"; @Private @VisibleForTesting @@ -294,6 +326,30 @@ protected void serviceInit(Configuration conf) throws Exception { RESOURCE_URI_STR)); } LOG.info("Timeline service address: " + resURI); + + timelineServerPluginEnabled = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_PLUGIN_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PLUGIN_ENABLED); + if (timelineServerPluginEnabled) { + activePath = + new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR, + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_ACTIVE_DIR_DEFAULT)); + fs = activePath.getFileSystem(conf); + if (!fs.exists(activePath)) { + throw new IOException(activePath + " does not exist"); + } + Collection filterStrings = conf.getStringCollection( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES); + if (filterStrings.isEmpty()) { + throw new IllegalArgumentException( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_SUMMARY_ENTITY_TYPES + + " is not set"); + } + summaryEntityTypes = new HashSet(filterStrings); + objMapper = createObjectMapper(); + jsonFactory = new JsonFactory(); + } super.serviceInit(conf); } @@ -659,4 +715,142 @@ private static void printUsage() { public UserGroupInformation getUgi() { return authUgi; } + + @Override + public TimelinePutResponse putEntities(CacheId cacheId, + ApplicationAttemptId appAttemptId, TimelineEntity... entities) + throws IOException, YarnException { + if (!timelineServerPluginEnabled || appAttemptId == null) { + return putEntities(entities); + } + List entitiesToDB = new ArrayList(); + List entitiesToSummary = new ArrayList(); + List entitiesToEntity = new ArrayList(); + Path attemptDir = createAttemptDir(appAttemptId); + + for (TimelineEntity entity : entities) { + if (summaryEntityTypes.contains(entity.getEntityType())) { + entitiesToSummary.add(entity); + } else { + if (cacheId != null) { + entitiesToEntity.add(entity); + } else { + entitiesToDB.add(entity); + } + } + } + + if (!entitiesToSummary.isEmpty()) { + Path summaryLogPath = + new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString()); + LOG.info("Writing summary log for " + appAttemptId.toString() + " to " + + summaryLogPath); + writeEntities(entitiesToSummary, summaryLogPath); + } + + if (!entitiesToEntity.isEmpty()) { + Path entityLogPath = + new Path(attemptDir, ENTITY_LOG_PREFIX + cacheId.toString()); + LOG.info("Writing entity log for " + cacheId.toString() + " to " + + entityLogPath); + writeEntities(entitiesToEntity, entityLogPath); + } + + if (!entitiesToDB.isEmpty()) { + putEntities(entitiesToDB.toArray( + new TimelineEntity[entitiesToDB.size()])); + } + + return new TimelinePutResponse(); + } + + private void writeEntities(List entities, Path logPath) + throws IOException { + FSDataOutputStream out = null; + try { + out = createLogFile(logPath); + JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(out); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + for (TimelineEntity entity : entities) { + objMapper.writeValue(jsonGenerator, entity); + } + jsonGenerator.close(); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException { + if (!timelineServerPluginEnabled || appAttemptId == null) { + putDomain(domain); + } else { + writeDomain(appAttemptId, domain); + } + } + + private ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(Inclusion.NON_NULL); + mapper.configure(Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + private Path createAttemptDir(ApplicationAttemptId appAttemptId) + throws IOException { + Path appDir = + new Path(activePath, appAttemptId.getApplicationId().toString()); + if (!fs.exists(appDir)) { + FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); + } + + Path attemptDir = new Path(appDir, appAttemptId.toString()); + if (!fs.exists(attemptDir)) { + FileSystem.mkdirs(fs, attemptDir, new FsPermission( + APP_LOG_DIR_PERMISSIONS)); + } + return attemptDir; + } + + private void writeDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException { + Path domainLogPath = + new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX + + appAttemptId.toString()); + LOG.info("Writing domains for " + appAttemptId.toString() + " to " + + domainLogPath); + FSDataOutputStream out = null; + try { + out = createLogFile(domainLogPath); + JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(out); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + objMapper.writeValue(jsonGenerator, domain); + jsonGenerator.close(); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + private FSDataOutputStream createLogFile(Path logPath) + throws IOException { + FSDataOutputStream stream; + if (!fs.exists(logPath)) { + stream = fs.create(logPath, false); + fs.setPermission(logPath, new FsPermission(FILE_LOG_PERMISSIONS)); + } else { + stream = fs.append(logPath); + } + return stream; + } + }