diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..f5d22d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1260,7 +1260,12 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + + /** The timeout that the async executor waits for termination. */ + public static final String TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT = + TIMELINE_SERVICE_PREFIX + "async-thread-termination-timeout-sec"; + public static final int + DEFAULT_TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT = 10; /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 0313f9e..4fbab2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -20,6 +20,7 @@ import java.io.IOException; +import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -70,6 +71,23 @@ public abstract TimelinePutResponse putEntities( /** *
+ * Send the information of a number of conceptual entities to the timeline + * server. It is a non-blocking API. The method will return Future objects + * immediately after queuing requests. + *
+ * + * @param entities + * the collection of {@link TimelineEntity} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract Future
* Send the information of a domain to the timeline server. It is a
* blocking API. The method will not return until it gets the response from
* the timeline server.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 2028cc9..f119754 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.impl;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@@ -30,6 +31,14 @@
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
@@ -101,6 +110,8 @@
}
private Client client;
+ private ExecutorService asyncRequestExecutor;
+ private int executorShutdownTimeout;
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
@@ -246,6 +257,7 @@ public TimelineClientImpl() {
super(TimelineClientImpl.class.getName());
}
+ @Override
protected void serviceInit(Configuration conf) throws Exception {
isEnabled = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENABLED,
@@ -283,11 +295,28 @@ protected void serviceInit(Configuration conf) throws Exception {
RESOURCE_URI_STR));
}
LOG.info("Timeline service address: " + resURI);
+
+ asyncRequestExecutor = Executors.newCachedThreadPool();
+ executorShutdownTimeout = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT,
+ YarnConfiguration
+ .DEFAULT_TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT);
}
+
super.serviceInit(conf);
}
@Override
+ protected void serviceStop() throws Exception {
+ asyncRequestExecutor.shutdown();
+ if (!asyncRequestExecutor
+ .awaitTermination(executorShutdownTimeout, TimeUnit.SECONDS)) {
+ LOG.warn("Timeout to wait for shutdown async executor.");
+ asyncRequestExecutor.shutdownNow();
+ }
+ }
+
+ @Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
if (!isEnabled) {
@@ -302,6 +331,25 @@ public TimelinePutResponse putEntities(
return resp.getEntity(TimelinePutResponse.class);
}
+ public class PutEntityTask implements Callable