diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..f5d22d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1260,7 +1260,12 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + + /** The timeout that the async executor waits for termination. */ + public static final String TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT = + TIMELINE_SERVICE_PREFIX + "async-thread-termination-timeout-sec"; + public static final int + DEFAULT_TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT = 10; /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 0313f9e..4fbab2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -20,6 +20,7 @@ import java.io.IOException; +import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -70,6 +71,23 @@ public abstract TimelinePutResponse putEntities( /** *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a non-blocking API. The method will return Future objects + * immediately after queuing requests. + *

+ * + * @param entities + * the collection of {@link TimelineEntity} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract Future putEntitiesAsync( + TimelineEntity... entities) throws IOException, YarnException; + + /** + *

* Send the information of a domain to the timeline server. It is a * blocking API. The method will not return until it gets the response from * the timeline server. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 2028cc9..f119754 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.impl; +import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; @@ -30,6 +31,14 @@ import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; @@ -101,6 +110,8 @@ } private Client client; + private ExecutorService asyncRequestExecutor; + private int executorShutdownTimeout; private ConnectionConfigurator connConfigurator; private DelegationTokenAuthenticator authenticator; private DelegationTokenAuthenticatedURL.Token token; @@ -246,6 +257,7 @@ public TimelineClientImpl() { super(TimelineClientImpl.class.getName()); } + @Override protected void serviceInit(Configuration conf) throws Exception { isEnabled = conf.getBoolean( YarnConfiguration.TIMELINE_SERVICE_ENABLED, @@ -283,11 +295,28 @@ protected void serviceInit(Configuration conf) throws Exception { RESOURCE_URI_STR)); } LOG.info("Timeline service address: " + resURI); + + asyncRequestExecutor = Executors.newCachedThreadPool(); + executorShutdownTimeout = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT, + YarnConfiguration + .DEFAULT_TIMELINE_SERVICE_ASYNC_THREAD_TERMINATION_TIMEOUT); } + super.serviceInit(conf); } @Override + protected void serviceStop() throws Exception { + asyncRequestExecutor.shutdown(); + if (!asyncRequestExecutor + .awaitTermination(executorShutdownTimeout, TimeUnit.SECONDS)) { + LOG.warn("Timeout to wait for shutdown async executor."); + asyncRequestExecutor.shutdownNow(); + } + } + + @Override public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { if (!isEnabled) { @@ -302,6 +331,25 @@ public TimelinePutResponse putEntities( return resp.getEntity(TimelinePutResponse.class); } + public class PutEntityTask implements Callable { + final TimelineEntity[] entities; + + public PutEntityTask(TimelineEntity... entities) { + Preconditions.checkNotNull(entities); + this.entities = entities; + } + + @Override + public TimelinePutResponse call() throws Exception { + return TimelineClientImpl.this.putEntities(entities); + } + } + + @Override + public Future putEntitiesAsync( + TimelineEntity... entities) throws IOException, YarnException { + return asyncRequestExecutor.submit(new PutEntityTask(entities)); + } @Override public void putDomain(TimelineDomain domain) throws IOException, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java index fe66e74..7b70c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api.impl; +import java.util.concurrent.Future; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -273,6 +274,20 @@ public void testDelegationTokenOperationsRetry() throws Exception { } } + @Test + public void testPutEntityAsync() throws Exception { + mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); + try { + Future + future = client.putEntitiesAsync(generateEntity()); + TimelinePutResponse response = future.get(); + Assert.assertEquals(0, response.getErrors().size()); + client.stop(); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + private static void assertFail() { Assert.fail("Exception expected! " + "Timeline server should be off to run this test.");