diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java
new file mode 100644
index 0000000..984e6e2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java
@@ -0,0 +1,163 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records.timeline;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import com.google.common.base.Splitter;
+
+/**
+ *
TimelineEntityGroupId is an abstract way for
+ * timeline service users to represent “a group of related timeline data.
+ * For example, all entities that represents one data flow DAG execution
+ * can be grouped into one timeline entity group.
+ */
+@Public
+@Unstable
+public class TimelineEntityGroupId implements
+ Comparable {
+
+ private static final Splitter SPLITTER = Splitter.on('_').trimResults();
+
+ private ApplicationId applicationId;
+ private String id;
+
+ @Private
+ @Unstable
+ public static final String TIMELINE_ENTITY_GROUPID_STR_PREFIX =
+ "timelineEntityGroupId";
+
+ public TimelineEntityGroupId() {
+
+ }
+
+ public static TimelineEntityGroupId newInstance(ApplicationId applicationId,
+ String id) {
+ TimelineEntityGroupId timelineEntityGroupId =
+ new TimelineEntityGroupId();
+ timelineEntityGroupId.setApplicationId(applicationId);
+ timelineEntityGroupId.setTimelineEntityGroupId(id);
+ return timelineEntityGroupId;
+ }
+
+ /**
+ * Get the ApplicationId of the
+ * TimelineEntityGroupId.
+ *
+ * @return ApplicationId of the
+ * TimelineEntityGroupId
+ */
+ public ApplicationId getApplicationId() {
+ return this.applicationId;
+ }
+
+ public void setApplicationId(ApplicationId appID) {
+ this.applicationId = appID;
+ }
+
+ /**
+ * Get the timelineEntityGroupId.
+ *
+ * @return timelineEntityGroupId
+ */
+ public String getTimelineEntityGroupId() {
+ return this.id;
+ }
+
+ @Private
+ @Unstable
+ protected void setTimelineEntityGroupId(String timelineEntityGroupId) {
+ this.id = timelineEntityGroupId;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getTimelineEntityGroupId().hashCode();
+ result = 31 * result + getApplicationId().hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ TimelineEntityGroupId otherObject = (TimelineEntityGroupId) obj;
+ if (!this.getApplicationId().equals(otherObject.getApplicationId())) {
+ return false;
+ }
+ if (!this.getTimelineEntityGroupId().equals(
+ otherObject.getTimelineEntityGroupId())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(TimelineEntityGroupId other) {
+ int compareAppIds =
+ this.getApplicationId().compareTo(other.getApplicationId());
+ if (compareAppIds == 0) {
+ return this.getTimelineEntityGroupId().compareTo(
+ other.getTimelineEntityGroupId());
+ } else {
+ return compareAppIds;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TIMELINE_ENTITY_GROUPID_STR_PREFIX + "_");
+ ApplicationId appId = getApplicationId();
+ sb.append(appId.getClusterTimestamp()).append("_");
+ sb.append(appId.getId()).append("_");
+ sb.append(getTimelineEntityGroupId());
+ return sb.toString();
+ }
+
+ public static TimelineEntityGroupId
+ fromString(String timelineEntityGroupIdStr) {
+ StringBuffer buf = new StringBuffer();
+ Iterator it = SPLITTER.split(timelineEntityGroupIdStr).iterator();
+ if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) {
+ throw new IllegalArgumentException(
+ "Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr);
+ }
+ ApplicationId appId =
+ ApplicationId.newInstance(Long.parseLong(it.next()),
+ Integer.parseInt(it.next()));
+ buf.append(it.next());
+ while (it.hasNext()) {
+ buf.append("_");
+ buf.append(it.next());
+ }
+ return TimelineEntityGroupId.newInstance(appId, buf.toString());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1f50d9b..66a8110 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1581,6 +1581,10 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-web-path.";
+ /** Timeline client settings */
+ public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "client.";
+
/**
* Path to war file or static content directory for this UI
* (For pluggable UIs).
@@ -1588,6 +1592,45 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-on-disk-path.";
+ /**
+ * The setting for timeline service v1.5
+ */
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "entity-group-fs-store.";
+
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
+ "/tmp/entity-file-history/active";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+ public static final String
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ "2000, 500";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
+ public static final long
+ TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10;
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs";
+ public static final long
+ TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60;
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs";
+ public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
+ 5*60;
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
@@ -1628,8 +1671,8 @@ private static void addDeprecatedKeys() {
public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
@Private
- public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
- "none";
+ public static final String
+ DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none";
/** The setting that controls whether timeline service is enabled or not. */
public static final String TIMELINE_SERVICE_ENABLED =
@@ -1678,7 +1721,7 @@ private static void addDeprecatedKeys() {
APPLICATION_HISTORY_PREFIX + "max-applications";
public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000;
- /** Timeline service store class */
+ /** Timeline service store class. */
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
@@ -1791,10 +1834,6 @@ private static void addDeprecatedKeys() {
public static final boolean
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
- /** Timeline client settings */
- public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
- TIMELINE_SERVICE_PREFIX + "client.";
-
/** Timeline client call, max retries (-1 means no limit) */
public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES =
TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index a3766f9..258b9f5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -26,8 +26,10 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -80,6 +82,28 @@ public abstract TimelinePutResponse putEntities(
/**
*
+ * Send the information of a number of conceptual entities to the timeline
+ * server. It is a blocking API. The method will not return until it gets the
+ * response from the timeline server.
+ *
+ * This API is only for timeline service v1.5
+ *
+ *
+ * @param appAttemptId {@link ApplicationAttemptId}
+ * @param groupId {@link TimelineEntityGroupId}
+ * @param entities
+ * the collection of {@link TimelineEntity}
+ * @return the error information if the sent entities are not correctly stored
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException;
+
+ /**
+ *
* Send the information of a domain to the timeline server. It is a
* blocking API. The method will not return until it gets the response from
* the timeline server.
@@ -96,6 +120,25 @@ public abstract void putDomain(
/**
*
+ * Send the information of a domain to the timeline server. It is a
+ * blocking API. The method will not return until it gets the response from
+ * the timeline server.
+ *
+ * This API is only for timeline service v1.5
+ *
+ *
+ * @param domain
+ * an {@link TimelineDomain} object
+ * @param appAttemptId {@link ApplicationAttemptId}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException;
+
+ /**
+ *
* Get a delegation token so as to be able to talk to the timeline server in a
* secure way.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
new file mode 100644
index 0000000..abc2a28
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data into Leveldb store.
+ */
+@Private
+@Unstable
+public class DirectTimelineWriter extends TimelineWriter{
+
+ private static final Log LOG = LogFactory
+ .getLog(DirectTimelineWriter.class);
+
+ public DirectTimelineWriter(UserGroupInformation authUgi,
+ Client client, URI resURI) {
+ super(authUgi, client, resURI);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, TimelineEntity... entities)
+ throws IOException, YarnException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ throw new IOException("Not supported");
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
new file mode 100644
index 0000000..1c295e1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig.Feature;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data in any storage that
+ * implements a basic FileSystem interface.
+ * This writer is used for ATSv1.5.
+ */
+@Private
+@Unstable
+public class FileSystemTimelineWriter extends TimelineWriter{
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemTimelineWriter.class);
+
+ // This is temporary solution. The configuration will be deleted once we have
+ // the FileSystem API to check whether append operation is supported or not.
+ private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ = YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ + "entity-file.fs-support-append";
+
+ // App log directory must be readable by group so server can access logs
+ // and writable by group so it can be deleted by server
+ private static final short APP_LOG_DIR_PERMISSIONS = 0770;
+ // Logs must be readable by group so server can access them
+ private static final short FILE_LOG_PERMISSIONS = 0640;
+ private static final String DOMAIN_LOG_PREFIX = "domainlog-";
+ private static final String SUMMARY_LOG_PREFIX = "summarylog-";
+ private static final String ENTITY_LOG_PREFIX = "entitylog-";
+
+ private Path activePath = null;
+ private FileSystem fs = null;
+ private Set summaryEntityTypes;
+ private ObjectMapper objMapper = null;
+ private long flushIntervalSecs;
+ private long cleanIntervalSecs;
+ private long ttl;
+ private LogFDsCache logFDsCache = null;
+ private boolean isAppendSupported;
+
+ public FileSystemTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ super(authUgi, client, resURI);
+
+ Configuration fsConf = new Configuration(conf);
+ fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
+ String retryPolicy =
+ fsConf.get(YarnConfiguration.
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
+ YarnConfiguration.
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
+ fsConf.set("dfs.client.retry.policy.spec", retryPolicy);
+
+ activePath = new Path(fsConf.get(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+
+ String scheme = activePath.toUri().getScheme();
+ if (scheme == null) {
+ scheme = FileSystem.getDefaultUri(fsConf).getScheme();
+ }
+ if (scheme != null) {
+ String disableCacheName = String.format("fs.%s.impl.disable.cache",
+ scheme);
+ fsConf.setBoolean(disableCacheName, true);
+ }
+
+ fs = activePath.getFileSystem(fsConf);
+ if (!fs.exists(activePath)) {
+ throw new IOException(activePath + " does not exist");
+ }
+
+ summaryEntityTypes = new HashSet(
+ conf.getStringCollection(YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES));
+
+ flushIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT);
+
+ cleanIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT);
+
+ ttl = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
+
+ logFDsCache =
+ new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
+
+ this.isAppendSupported =
+ conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+
+ objMapper = createObjectMapper();
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder debugMSG = new StringBuilder();
+ debugMSG.append(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS
+ + "=" + flushIntervalSecs + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS
+ + "=" + cleanIntervalSecs + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+ + "=" + ttl + ", " +
+ TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ + "=" + isAppendSupported + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ + "=" + activePath);
+
+ if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) {
+ debugMSG.append(", " + YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES
+ + " = " + summaryEntityTypes);
+ }
+ LOG.debug(debugMSG.toString());
+ }
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException {
+ if (appAttemptId == null) {
+ return putEntities(entities);
+ }
+
+ List entitiesToDBStore = new ArrayList();
+ List entitiesToSummaryCache
+ = new ArrayList();
+ List entitiesToEntityCache
+ = new ArrayList();
+ Path attemptDir = createAttemptDir(appAttemptId);
+
+ for (TimelineEntity entity : entities) {
+ if (summaryEntityTypes.contains(entity.getEntityType())) {
+ entitiesToSummaryCache.add(entity);
+ } else {
+ if (groupId != null) {
+ entitiesToEntityCache.add(entity);
+ } else {
+ entitiesToDBStore.add(entity);
+ }
+ }
+ }
+
+ if (!entitiesToSummaryCache.isEmpty()) {
+ Path summaryLogPath =
+ new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing summary log for " + appAttemptId.toString() + " to "
+ + summaryLogPath);
+ }
+ this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper,
+ appAttemptId, entitiesToSummaryCache, isAppendSupported);
+ }
+
+ if (!entitiesToEntityCache.isEmpty()) {
+ Path entityLogPath =
+ new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing entity log for " + groupId.toString() + " to "
+ + entityLogPath);
+ }
+ this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper,
+ appAttemptId, groupId, entitiesToEntityCache, isAppendSupported);
+ }
+
+ if (!entitiesToDBStore.isEmpty()) {
+ putEntities(entitiesToDBStore.toArray(
+ new TimelineEntity[entitiesToDBStore.size()]));
+ }
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ if (appAttemptId == null) {
+ putDomain(domain);
+ } else {
+ writeDomain(appAttemptId, domain);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.logFDsCache != null) {
+ this.logFDsCache.close();
+ }
+ }
+
+ private ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+ mapper.setSerializationInclusion(Inclusion.NON_NULL);
+ mapper.configure(Feature.CLOSE_CLOSEABLE, false);
+ return mapper;
+ }
+
+ private Path createAttemptDir(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ Path appDir = createApplicationDir(appAttemptId.getApplicationId());
+
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
+ if (!fs.exists(attemptDir)) {
+ FileSystem.mkdirs(fs, attemptDir, new FsPermission(
+ APP_LOG_DIR_PERMISSIONS));
+ }
+ return attemptDir;
+ }
+
+ private Path createApplicationDir(ApplicationId appId) throws IOException {
+ Path appDir =
+ new Path(activePath, appId.toString());
+ if (!fs.exists(appDir)) {
+ FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
+ }
+ return appDir;
+ }
+
+ private void writeDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException {
+ Path domainLogPath =
+ new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
+ + appAttemptId.toString());
+ LOG.info("Writing domains for " + appAttemptId.toString() + " to "
+ + domainLogPath);
+ this.logFDsCache.writeDomainLog(
+ fs, domainLogPath, objMapper, domain, isAppendSupported);
+ }
+
+ private static class DomainLogFD extends LogFD {
+ public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ super(fs, logPath, objMapper, isAppendSupported);
+ }
+
+ public void writeDomain(TimelineDomain domain)
+ throws IOException {
+ getObjectMapper().writeValue(getJsonGenerator(), domain);
+ updateLastModifiedTime(System.currentTimeMillis());
+ }
+ }
+
+ private static class EntityLogFD extends LogFD {
+ public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ super(fs, logPath, objMapper, isAppendSupported);
+ }
+
+ public void writeEntities(List entities)
+ throws IOException {
+ if (writerClosed()) {
+ prepareForWrite();
+ }
+ for (TimelineEntity entity : entities) {
+ getObjectMapper().writeValue(getJsonGenerator(), entity);
+ }
+ updateLastModifiedTime(System.currentTimeMillis());
+ }
+ }
+
+ private static class LogFD {
+ private FSDataOutputStream stream;
+ private ObjectMapper objMapper;
+ private JsonGenerator jsonGenerator;
+ private long lastModifiedTime;
+ private final boolean isAppendSupported;
+ private final ReentrantLock fdLock = new ReentrantLock();
+ private final FileSystem fs;
+ private final Path logPath;
+
+ public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ this.fs = fs;
+ this.logPath = logPath;
+ this.isAppendSupported = isAppendSupported;
+ this.objMapper = objMapper;
+ prepareForWrite();
+ }
+
+ public void close() {
+ if (stream != null) {
+ IOUtils.cleanup(LOG, jsonGenerator);
+ IOUtils.cleanup(LOG, stream);
+ stream = null;
+ jsonGenerator = null;
+ }
+ }
+
+ public void flush() throws IOException {
+ if (stream != null) {
+ stream.hflush();
+ }
+ }
+
+ public long getLastModifiedTime() {
+ return this.lastModifiedTime;
+ }
+
+ protected void prepareForWrite() throws IOException{
+ this.stream = createLogFileStream(fs, logPath);
+ this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
+ this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+ this.lastModifiedTime = System.currentTimeMillis();
+ }
+
+ protected boolean writerClosed() {
+ return stream == null;
+ }
+
+ private FSDataOutputStream createLogFileStream(FileSystem fileSystem,
+ Path logPathToCreate)
+ throws IOException {
+ FSDataOutputStream streamToCreate;
+ if (!isAppendSupported) {
+ logPathToCreate =
+ new Path(logPathToCreate.getParent(),
+ (logPathToCreate.getName() + "_" + System.currentTimeMillis()));
+ }
+ if (!fileSystem.exists(logPathToCreate)) {
+ streamToCreate = fileSystem.create(logPathToCreate, false);
+ fileSystem.setPermission(logPathToCreate,
+ new FsPermission(FILE_LOG_PERMISSIONS));
+ } else {
+ streamToCreate = fileSystem.append(logPathToCreate);
+ }
+ return streamToCreate;
+ }
+
+ public void lock() {
+ this.fdLock.lock();
+ }
+
+ public void unlock() {
+ this.fdLock.unlock();
+ }
+
+ protected JsonGenerator getJsonGenerator() {
+ return jsonGenerator;
+ }
+
+ protected ObjectMapper getObjectMapper() {
+ return objMapper;
+ }
+
+ protected void updateLastModifiedTime(long updatedTime) {
+ this.lastModifiedTime = updatedTime;
+ }
+ }
+
+ private static class LogFDsCache implements Closeable, Flushable{
+ private DomainLogFD domainLogFD;
+ private Map summanyLogFDs;
+ private Map> entityLogFDs;
+ private Timer flushTimer;
+ private FlushTimerTask flushTimerTask;
+ private Timer cleanInActiveFDsTimer;
+ private CleanInActiveFDsTask cleanInActiveFDsTask;
+ private final long ttl;
+ private final ReentrantLock domainFDLocker = new ReentrantLock();
+ private final ReentrantLock summaryTableLocker = new ReentrantLock();
+ private final ReentrantLock entityTableLocker = new ReentrantLock();
+ private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
+ private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
+ private volatile boolean serviceStopped = false;
+
+ public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
+ long ttl) {
+ domainLogFD = null;
+ summanyLogFDs = new HashMap();
+ entityLogFDs = new HashMap>();
+ this.flushTimer =
+ new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
+ true);
+ this.flushTimerTask = new FlushTimerTask();
+ this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
+ flushIntervalSecs * 1000);
+
+ this.cleanInActiveFDsTimer =
+ new Timer(LogFDsCache.class.getSimpleName() +
+ "cleanInActiveFDsTimer", true);
+ this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
+ this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
+ cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
+ this.ttl = ttl * 1000;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ domainLogFD.flush();
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs));
+
+ flushEntityFDMap(copyEntityLogFDs(entityLogFDs));
+ }
+
+ private Map copySummaryLogFDs(
+ Map summanyLogFDsToCopy) {
+ try {
+ summaryTableCopyLocker.lock();
+ return new HashMap(
+ summanyLogFDsToCopy);
+ } finally {
+ summaryTableCopyLocker.unlock();
+ }
+ }
+
+ private Map> copyEntityLogFDs(Map> entityLogFDsToCopy) {
+ try {
+ entityTableCopyLocker.lock();
+ return new HashMap>(entityLogFDsToCopy);
+ } finally {
+ entityTableCopyLocker.unlock();
+ }
+ }
+
+ private void flushSummaryFDMap(Map logFDs) throws IOException {
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry : logFDs
+ .entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.flush();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+
+ private void flushEntityFDMap(Map> logFDs) throws IOException {
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.flush();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ private class FlushTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ flush();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(e);
+ }
+ }
+ }
+ }
+
+ private void cleanInActiveFDs() {
+ long currentTimeStamp = System.currentTimeMillis();
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
+ domainLogFD.close();
+ domainLogFD = null;
+ }
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs),
+ currentTimeStamp);
+
+ cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs),
+ currentTimeStamp);
+ }
+
+ private void cleanInActiveSummaryFDsforMap(
+ Map logFDs,
+ long currentTimeStamp) {
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry : logFDs
+ .entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+ logFD.close();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+
+ private void cleanInActiveEntityFDsforMap(Map> logFDs,
+ long currentTimeStamp) {
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry
+ : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+ logFD.close();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ private class CleanInActiveFDsTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ cleanInActiveFDs();
+ } catch (Exception e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ serviceStopped = true;
+
+ flushTimer.cancel();
+ cleanInActiveFDsTimer.cancel();
+
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ domainLogFD.close();
+ domainLogFD = null;
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ closeSummaryFDs(summanyLogFDs);
+
+ closeEntityFDs(entityLogFDs);
+ }
+
+ private void closeEntityFDs(Map> logFDs) {
+ try {
+ entityTableLocker.lock();
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.close();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ } finally {
+ entityTableLocker.unlock();
+ }
+ }
+
+ private void closeSummaryFDs(
+ Map logFDs) {
+ try {
+ summaryTableLocker.lock();
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry
+ : logFDs.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.close();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ } finally {
+ summaryTableLocker.unlock();
+ }
+ }
+
+ public void writeDomainLog(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, TimelineDomain domain,
+ boolean isAppendSupported) throws IOException {
+ try {
+ this.domainFDLocker.lock();
+ if (this.domainLogFD != null) {
+ this.domainLogFD.writeDomain(domain);
+ } else {
+ this.domainLogFD =
+ new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
+ this.domainLogFD.writeDomain(domain);
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+ }
+
+ public void writeEntityLogs(FileSystem fs, Path entityLogPath,
+ ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, List entitiesToEntity,
+ boolean isAppendSupported) throws IOException{
+ writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
+ groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
+ }
+
+ private void writeEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ TimelineEntityGroupId groupId, List entities,
+ boolean isAppendSupported, Map> logFDs) throws IOException {
+ HashMaplogMapFD
+ = logFDs.get(attemptId);
+ if (logMapFD != null) {
+ EntityLogFD logFD = logMapFD.get(groupId);
+ if (logFD != null) {
+ try {
+ logFD.lock();
+ if (serviceStopped) {
+ return;
+ }
+ logFD.writeEntities(entities);
+ } finally {
+ logFD.unlock();
+ }
+ } else {
+ createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+ entities, isAppendSupported, logFDs);
+ }
+ } else {
+ createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+ entities, isAppendSupported, logFDs);
+ }
+ }
+
+ private void createEntityFDandWrite(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ TimelineEntityGroupId groupId, List entities,
+ boolean isAppendSupported, Map> logFDs) throws IOException{
+ try {
+ entityTableLocker.lock();
+ if (serviceStopped) {
+ return;
+ }
+ HashMap logFDMap =
+ logFDs.get(attemptId);
+ if (logFDMap == null) {
+ logFDMap = new HashMap();
+ }
+ EntityLogFD logFD = logFDMap.get(groupId);
+ if (logFD == null) {
+ logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+ }
+ try {
+ logFD.lock();
+ logFD.writeEntities(entities);
+ try {
+ entityTableCopyLocker.lock();
+ logFDMap.put(groupId, logFD);
+ logFDs.put(attemptId, logFDMap);
+ } finally {
+ entityTableCopyLocker.unlock();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ } finally {
+ entityTableLocker.unlock();
+ }
+ }
+
+ public void writeSummaryEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported)
+ throws IOException {
+ writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
+ isAppendSupported, this.summanyLogFDs);
+ }
+
+ private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported,
+ Map logFDs) throws IOException {
+ EntityLogFD logFD = null;
+ logFD = logFDs.get(attemptId);
+ if (logFD != null) {
+ try {
+ logFD.lock();
+ if (serviceStopped) {
+ return;
+ }
+ logFD.writeEntities(entities);
+ } finally {
+ logFD.unlock();
+ }
+ } else {
+ createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities,
+ isAppendSupported, logFDs);
+ }
+ }
+
+ private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported,
+ Map logFDs) throws IOException {
+ try {
+ summaryTableLocker.lock();
+ if (serviceStopped) {
+ return;
+ }
+ EntityLogFD logFD = logFDs.get(attemptId);
+ if (logFD == null) {
+ logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+ }
+ try {
+ logFD.lock();
+ logFD.writeEntities(entities);
+ try {
+ summaryTableCopyLocker.lock();
+ logFDs.put(attemptId, logFD);
+ } finally {
+ summaryTableCopyLocker.unlock();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ } finally {
+ summaryTableLocker.unlock();
+ }
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 019c7a5..195a661 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -33,8 +33,6 @@
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -54,19 +52,19 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -74,7 +72,6 @@
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
@@ -110,6 +107,9 @@
private URI resURI;
private UserGroupInformation authUgi;
private String doAsUser;
+ private Configuration configuration;
+ private float timelineServiceVersion;
+ private TimelineWriter timelineWriter;
@Private
@VisibleForTesting
@@ -254,6 +254,7 @@ public TimelineClientImpl() {
}
protected void serviceInit(Configuration conf) throws Exception {
+ this.configuration = conf;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
@@ -293,58 +294,48 @@ protected void serviceInit(Configuration conf) throws Exception {
RESOURCE_URI_STR));
}
LOG.info("Timeline service address: " + resURI);
+ timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
super.serviceInit(conf);
}
@Override
+ protected void serviceStart() throws Exception {
+ timelineWriter = createTimelineWriter(
+ configuration, authUgi, client, resURI);
+ }
+
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation ugi, Client webClient, URI uri)
+ throws IOException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
+ return new FileSystemTimelineWriter(
+ conf, ugi, webClient, uri);
+ } else {
+ return new DirectTimelineWriter(ugi, webClient, uri);
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.timelineWriter != null) {
+ this.timelineWriter.close();
+ }
+ super.serviceStop();
+ }
+
+ @Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
- TimelineEntities entitiesContainer = new TimelineEntities();
- for (TimelineEntity entity : entities) {
- if (entity.getEntityId() == null || entity.getEntityType() == null) {
- throw new YarnException("Incomplete entity without entity id/type");
- }
- entitiesContainer.addEntity(entity);
- }
- ClientResponse resp = doPosting(entitiesContainer, null);
- return resp.getEntity(TimelinePutResponse.class);
+ return timelineWriter.putEntities(entities);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
- doPosting(domain, "domain");
- }
-
- private ClientResponse doPosting(final Object obj, final String path)
- throws IOException, YarnException {
- ClientResponse resp;
- try {
- resp = authUgi.doAs(new PrivilegedExceptionAction() {
- @Override
- public ClientResponse run() throws Exception {
- return doPostingObject(obj, path);
- }
- });
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- if (resp == null ||
- resp.getClientResponseStatus() != ClientResponse.Status.OK) {
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg);
- if (LOG.isDebugEnabled() && resp != null) {
- String output = resp.getEntity(String.class);
- LOG.debug("HTTP error code: " + resp.getStatus()
- + " Server response : \n" + output);
- }
- throw new YarnException(msg);
- }
- return resp;
+ timelineWriter.putDomain(domain);
}
@SuppressWarnings("unchecked")
@@ -470,23 +461,6 @@ public boolean shouldRetryOn(Exception e) {
return connectionRetry.retryOn(tokenRetryOp);
}
- @Private
- @VisibleForTesting
- public ClientResponse doPostingObject(Object object, String path) {
- WebResource webResource = client.resource(resURI);
- if (path == null) {
- return webResource.accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .post(ClientResponse.class, object);
- } else if (path.equals("domain")) {
- return webResource.path(path).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .put(ClientResponse.class, object);
- } else {
- throw new YarnRuntimeException("Unknown resource type");
- }
- }
-
private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
@@ -663,4 +637,34 @@ private static void printUsage() {
public UserGroupInformation getUgi() {
return authUgi;
}
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, TimelineEntity... entities)
+ throws IOException, YarnException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+ throw new YarnException(
+ "This API is not supported under current Timeline Service Version: "
+ + timelineServiceVersion);
+ }
+
+ return timelineWriter.putEntities(appAttemptId, groupId, entities);
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+ throw new YarnException(
+ "This API is not supported under current Timeline Service Version: "
+ + timelineServiceVersion);
+ }
+ timelineWriter.putDomain(appAttemptId, domain);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setTimelineWriter(TimelineWriter writer) {
+ this.timelineWriter = writer;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
new file mode 100644
index 0000000..c616e63
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * Base writer class to write the Timeline data.
+ */
+@Private
+@Unstable
+public abstract class TimelineWriter {
+
+ private static final Log LOG = LogFactory
+ .getLog(TimelineWriter.class);
+
+ private UserGroupInformation authUgi;
+ private Client client;
+ private URI resURI;
+
+ public TimelineWriter(UserGroupInformation authUgi, Client client,
+ URI resURI) {
+ this.authUgi = authUgi;
+ this.client = client;
+ this.resURI = resURI;
+ }
+
+ public void close() throws Exception {
+ // DO NOTHING
+ }
+
+ public TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException {
+ TimelineEntities entitiesContainer = new TimelineEntities();
+ for (TimelineEntity entity : entities) {
+ if (entity.getEntityId() == null || entity.getEntityType() == null) {
+ throw new YarnException("Incomplete entity without entity id/type");
+ }
+ entitiesContainer.addEntity(entity);
+ }
+ ClientResponse resp = doPosting(entitiesContainer, null);
+ return resp.getEntity(TimelinePutResponse.class);
+ }
+
+ public void putDomain(TimelineDomain domain) throws IOException,
+ YarnException {
+ doPosting(domain, "domain");
+ }
+
+ public abstract TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException;
+
+ public abstract void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException;
+
+ private ClientResponse doPosting(final Object obj, final String path)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = authUgi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ClientResponse run() throws Exception {
+ return doPostingObject(obj, path);
+ }
+ });
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ return resp;
+ }
+
+ @Private
+ @VisibleForTesting
+ public ClientResponse doPostingObject(Object object, String path) {
+ WebResource webResource = client.resource(resURI);
+ if (path == null) {
+ return webResource.accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class, object);
+ } else if (path.equals("domain")) {
+ return webResource.path(path).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, object);
+ } else {
+ throw new YarnRuntimeException("Unknown resource type");
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
new file mode 100644
index 0000000..55b1496
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineEntityGroupId {
+
+ @Test
+ public void testTimelineEntityGroupId() {
+ ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+ TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
+ TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
+ TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
+ TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");
+
+ Assert.assertTrue(group1.equals(group4));
+ Assert.assertFalse(group1.equals(group2));
+ Assert.assertFalse(group1.equals(group3));
+
+ Assert.assertTrue(group1.compareTo(group4) == 0);
+ Assert.assertTrue(group1.compareTo(group2) < 0);
+ Assert.assertTrue(group1.compareTo(group3) < 0);
+
+ Assert.assertTrue(group1.hashCode() == group4.hashCode());
+ Assert.assertFalse(group1.hashCode() == group2.hashCode());
+ Assert.assertFalse(group1.hashCode() == group3.hashCode());
+
+ Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
+ Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index 4c74c61..39fc8de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -25,8 +25,11 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.ConnectException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
@@ -37,7 +40,6 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@@ -46,17 +48,20 @@
import org.junit.Before;
import org.junit.Test;
+import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClient {
private TimelineClientImpl client;
+ private TimelineWriter spyTimelineWriter;
@Before
public void setup() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
client = createTimelineClient(conf);
}
@@ -69,7 +74,8 @@ public void tearDown() {
@Test
public void testPostEntities() throws Exception {
- mockEntityClientResponse(client, ClientResponse.Status.OK, false, false);
+ mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK,
+ false, false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size());
@@ -80,7 +86,8 @@ public void testPostEntities() throws Exception {
@Test
public void testPostEntitiesWithError() throws Exception {
- mockEntityClientResponse(client, ClientResponse.Status.OK, true, false);
+ mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true,
+ false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(1, response.getErrors().size());
@@ -106,8 +113,8 @@ public void testPostIncompleteEntities() throws Exception {
@Test
public void testPostEntitiesNoResponse() throws Exception {
- mockEntityClientResponse(
- client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ mockEntityClientResponse(spyTimelineWriter,
+ ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
try {
client.putEntities(generateEntity());
Assert.fail("Exception is expected");
@@ -119,7 +126,7 @@ public void testPostEntitiesNoResponse() throws Exception {
@Test
public void testPostEntitiesConnectionRefused() throws Exception {
- mockEntityClientResponse(client, null, false, true);
+ mockEntityClientResponse(spyTimelineWriter, null, false, true);
try {
client.putEntities(generateEntity());
Assert.fail("RuntimeException is expected");
@@ -130,7 +137,7 @@ public void testPostEntitiesConnectionRefused() throws Exception {
@Test
public void testPutDomain() throws Exception {
- mockDomainClientResponse(client, ClientResponse.Status.OK, false);
+ mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false);
try {
client.putDomain(generateDomain());
} catch (YarnException e) {
@@ -140,7 +147,8 @@ public void testPutDomain() throws Exception {
@Test
public void testPutDomainNoResponse() throws Exception {
- mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false);
+ mockDomainClientResponse(spyTimelineWriter,
+ ClientResponse.Status.FORBIDDEN, false);
try {
client.putDomain(generateDomain());
Assert.fail("Exception is expected");
@@ -152,7 +160,7 @@ public void testPutDomainNoResponse() throws Exception {
@Test
public void testPutDomainConnectionRefused() throws Exception {
- mockDomainClientResponse(client, null, true);
+ mockDomainClientResponse(spyTimelineWriter, null, true);
try {
client.putDomain(generateDomain());
Assert.fail("RuntimeException is expected");
@@ -291,15 +299,16 @@ private void assertException(TimelineClientImpl client, RuntimeException ce) {
}
private static ClientResponse mockEntityClientResponse(
- TimelineClientImpl client, ClientResponse.Status status,
+ TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) {
- doThrow(new ClientHandlerException(new ConnectException())).when(client)
- .doPostingObject(any(TimelineEntities.class), any(String.class));
+ doThrow(new ClientHandlerException(new ConnectException())).when(
+ spyTimelineWriter).doPostingObject(
+ any(TimelineEntities.class), any(String.class));
return response;
}
- doReturn(response).when(client)
+ doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineEntities.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status);
TimelinePutResponse.TimelinePutError error =
@@ -316,15 +325,16 @@ private static ClientResponse mockEntityClientResponse(
}
private static ClientResponse mockDomainClientResponse(
- TimelineClientImpl client, ClientResponse.Status status,
+ TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) {
- doThrow(new ClientHandlerException(new ConnectException())).when(client)
- .doPostingObject(any(TimelineDomain.class), any(String.class));
+ doThrow(new ClientHandlerException(new ConnectException())).when(
+ spyTimelineWriter).doPostingObject(any(TimelineDomain.class),
+ any(String.class));
return response;
}
- doReturn(response).when(client)
+ doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineDomain.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status);
return response;
@@ -365,10 +375,19 @@ public static TimelineDomain generateDomain() {
return domain;
}
- private static TimelineClientImpl createTimelineClient(
+ private TimelineClientImpl createTimelineClient(
YarnConfiguration conf) {
- TimelineClientImpl client =
- spy((TimelineClientImpl) TimelineClient.createTimelineClient());
+ TimelineClientImpl client = new TimelineClientImpl() {
+ @Override
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ TimelineWriter timelineWriter =
+ new DirectTimelineWriter(authUgi, client, resURI);
+ spyTimelineWriter = spy(timelineWriter);
+ return spyTimelineWriter;
+ }
+ };
client.init(conf);
client.start();
return client;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
new file mode 100644
index 0000000..37eadbf
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClientForATS1_5 {
+
+ protected static Log LOG = LogFactory
+ .getLog(TestTimelineClientForATS1_5.class);
+
+ private TimelineClientImpl client;
+ private static FileContext localFS;
+ private static File localActiveDir;
+ private TimelineWriter spyTimelineWriter;
+
+ @Before
+ public void setup() throws Exception {
+ localFS = FileContext.getLocalFSFileContext();
+ localActiveDir =
+ new File("target", this.getClass().getSimpleName() + "-activeDir")
+ .getAbsoluteFile();
+ localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+ localActiveDir.mkdir();
+ LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ localActiveDir.getAbsolutePath());
+ conf.set(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+ "summary_type");
+ client = createTimelineClient(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (client != null) {
+ client.stop();
+ }
+ localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TimelineEntityGroupId groupId =
+ TimelineEntityGroupId.newInstance(appId, "1");
+ TimelineEntityGroupId groupId2 =
+ TimelineEntityGroupId.newInstance(appId, "2");
+ // Create two entities, includes an entity type and a summary type
+ TimelineEntity[] entities = new TimelineEntity[2];
+ entities[0] = generateEntity("entity_type");
+ entities[1] = generateEntity("summary_type");
+ try {
+ // if attemptid is null, fall back to the original putEntities call, and
+ // save the entity
+ // into configured levelDB store
+ client.putEntities(null, null, entities);
+ verify(spyTimelineWriter, times(1)).putEntities(entities);
+ reset(spyTimelineWriter);
+
+ // if the attemptId is specified, but groupId is given as null, it would
+ // fall back to the original putEntities call if we have the entity type.
+ // the entity which is summary type would be written into FS
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId, 1);
+ client.putEntities(attemptId1, null, entities);
+ TimelineEntity[] entityTDB = new TimelineEntity[1];
+ entityTDB[0] = entities[0];
+ verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId1), "summarylog-"
+ + attemptId1.toString())));
+ reset(spyTimelineWriter);
+
+ // if we specified attemptId as well as groupId, it would save the entity
+ // into
+ // FileSystem instead of levelDB store
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId, 2);
+ client.putEntities(attemptId2, groupId, entities);
+ client.putEntities(attemptId2, groupId2, entities);
+ verify(spyTimelineWriter, times(0)).putEntities(
+ any(TimelineEntity[].class));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "summarylog-"
+ + attemptId2.toString())));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ + groupId.toString())));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ + groupId2.toString())));
+ reset(spyTimelineWriter);
+ } catch (Exception e) {
+ Assert.fail("Exception is not expected. " + e);
+ }
+ }
+
+ @Test
+ public void testPutDomain() {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId, 1);
+ try {
+ TimelineDomain domain = generateDomain();
+
+ client.putDomain(null, domain);
+ verify(spyTimelineWriter, times(1)).putDomain(domain);
+ reset(spyTimelineWriter);
+
+ client.putDomain(attemptId1, domain);
+ verify(spyTimelineWriter, times(0)).putDomain(domain);
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId1), "domainlog-"
+ + attemptId1.toString())));
+ reset(spyTimelineWriter);
+ } catch (Exception e) {
+ Assert.fail("Exception is not expected." + e);
+ }
+ }
+
+ private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
+ Path appDir =
+ new Path(localActiveDir.getAbsolutePath(), appAttemptId
+ .getApplicationId().toString());
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
+ return attemptDir;
+ }
+
+ private static TimelineEntity generateEntity(String type) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId("entity id");
+ entity.setEntityType(type);
+ entity.setStartTime(System.currentTimeMillis());
+ return entity;
+ }
+
+ private static TimelineDomain generateDomain() {
+ TimelineDomain domain = new TimelineDomain();
+ domain.setId("namesapce id");
+ domain.setDescription("domain description");
+ domain.setOwner("domain owner");
+ domain.setReaders("domain_reader");
+ domain.setWriters("domain_writer");
+ domain.setCreatedTime(0L);
+ domain.setModifiedTime(1L);
+ return domain;
+ }
+
+ private TimelineClientImpl createTimelineClient(YarnConfiguration conf) {
+ TimelineClientImpl client = new TimelineClientImpl() {
+ @Override
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ TimelineWriter timelineWriter =
+ new FileSystemTimelineWriter(conf, authUgi, client, resURI) {
+ public ClientResponse doPostingObject(Object object, String path) {
+ ClientResponse response = mock(ClientResponse.class);
+ when(response.getClientResponseStatus()).thenReturn(
+ ClientResponse.Status.OK);
+ return response;
+ }
+ };
+ spyTimelineWriter = spy(timelineWriter);
+ return spyTimelineWriter;
+ }
+ };
+
+ client.init(conf);
+ client.start();
+ return client;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
index 5cb1baa..46d5b6b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
@@ -19,15 +19,20 @@
package org.apache.hadoop.yarn.server.timeline.webapp;
import java.io.File;
+import java.io.IOException;
+import java.net.URI;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
@@ -39,6 +44,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineWebServicesWithSSL {
@@ -60,6 +66,7 @@ public static void setupServer() throws Exception {
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
@@ -123,11 +130,17 @@ public void testPutEntities() throws Exception {
private ClientResponse resp;
@Override
- public ClientResponse doPostingObject(Object obj, String path) {
- resp = super.doPostingObject(obj, path);
- return resp;
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ return new DirectTimelineWriter(authUgi, client, resURI) {
+ @Override
+ public ClientResponse doPostingObject(Object obj, String path) {
+ resp = super.doPostingObject(obj, path);
+ return resp;
+ }
+ };
}
-
}
}