diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index e90f68e..83aa7b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -30,10 +30,12 @@ import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; @@ -868,44 +870,29 @@ public void setTimelineWriter(TimelineWriter writer) { this.timelineWriter = writer; } - private static final class EntitiesHolder { - CountDownLatch entitiesPublishedSignal; - private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities; - private Exception failureException;// TimelinePutResponse once properly - // supported + private final class EntitiesHolder extends FutureTask { + private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities; + private final boolean isSync; EntitiesHolder( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities, + final boolean isSync) { + super(new Callable() { + // publishEntities() + public Void call() throws Exception { + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("appid", contextAppId.toString()); + params.add("async", Boolean.toString(!isSync)); + putObjects("entities", params, entities); + return null; + } + }); this.entities = entities; - } - - EntitiesHolder( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities, - boolean isSync) { - this.entities = entities; - if (isSync) { - entitiesPublishedSignal = new CountDownLatch(1); - } + this.isSync = isSync; } public boolean isSync() { - return entitiesPublishedSignal != null; - } - - public void waitTillPublished() throws InterruptedException { - entitiesPublishedSignal.await(); - } - - public void finishedPublishing() { - entitiesPublishedSignal.countDown(); - } - - public Exception getFailureException() { - return failureException; - } - - public void setFailureException(Exception failureException) { - this.failureException = failureException; + return isSync; } public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() { @@ -920,33 +907,29 @@ public void setFailureException(Exception failureException) { private class TimelineEntityDispatcher { private static final int NUMBER_OF_ASYNC_CALLS_TO_MERGE = 2; private final BlockingQueue timelineEntityQueue; - private ScheduledExecutorService scheduler; - // Block newly coming events into the queue while stopping when stopped. - private volatile boolean stopped = false; + private ExecutorService executor; TimelineEntityDispatcher() { timelineEntityQueue = new LinkedBlockingQueue(); } - Runnable createThread() { + Runnable createRunnable() { return new Runnable() { @Override public void run() { EntitiesHolder entitiesHolder; - while (!stopped && !Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { // Merge all the async calls and make one push and if its sync push // and get the status try { entitiesHolder = timelineEntityQueue.take(); } catch (InterruptedException ie) { - if (!stopped) { - LOG.warn("TimelineThread interrupted", ie); - } + Thread.currentThread().interrupt(); return; } if (entitiesHolder != null) { if (entitiesHolder.isSync()) { - publishEntities(entitiesHolder); + entitiesHolder.run(); } else { int count = 0; while (true) { @@ -956,13 +939,13 @@ public void run() { timelineEntityQueue.poll(); if (tempEntitiesHolder == null) { // flush all the async entities together - publishEntities(entitiesHolder); + entitiesHolder.run(); break; } else if (tempEntitiesHolder.isSync()) { // flush all the async entities together - publishEntities(entitiesHolder); + entitiesHolder.run(); // and then flush the sync entity - publishEntities(tempEntitiesHolder); + tempEntitiesHolder.run(); break; } else { // append all async entities together and then flush @@ -971,7 +954,7 @@ public void run() { count++; if (count == NUMBER_OF_ASYNC_CALLS_TO_MERGE) { // flush all the async entities together - publishEntities(entitiesHolder); + entitiesHolder.run(); break; } } @@ -980,31 +963,13 @@ public void run() { } } } - - private void publishEntities(EntitiesHolder entities) { - boolean isSync = entities.isSync(); - MultivaluedMap params = new MultivaluedMapImpl(); - params.add("appid", contextAppId.toString()); - params.add("async", Boolean.toString(!isSync)); - try { - putObjects("entities", params, entities.getEntities()); - } catch (IOException | YarnException e) { - LOG.error("Error while publishing ATS Entity", e); - if (isSync) { - entities.setFailureException(e); - } - } - if (isSync) { - entities.finishedPublishing(); - } - } }; } public void dispatchEntities(boolean sync, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished) throws YarnException { - if (stopped) { + if (executor.isShutdown()) { throw new YarnException("Timeline client is in the process of stopping," + " not accepting any more TimelineEntities"); } @@ -1017,8 +982,7 @@ public void dispatchEntities(boolean sync, } // created a holder and place it in queue - EntitiesHolder entitiesHolder = (sync) - ? new EntitiesHolder(entities, true) : new EntitiesHolder(entities); + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); try { timelineEntityQueue.put(entitiesHolder); } catch (InterruptedException e) { @@ -1030,13 +994,13 @@ public void dispatchEntities(boolean sync, // In sync call we need to wait till its published and if any error then // throw it back try { - entitiesHolder.waitTillPublished(); - if (entitiesHolder.getFailureException() != null) { - throw new YarnException( - "Failed while adding entity to the queue for publishing", - entitiesHolder.getFailureException()); - } + entitiesHolder.get(); + } catch (ExecutionException e) { + throw new YarnException( + "Failed while adding entity to the queue for publishing", + e.getCause()); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new YarnException( "Failed while adding entity to the queue for publishing", e); } @@ -1044,17 +1008,16 @@ public void dispatchEntities(boolean sync, } public void start() { - scheduler = Executors.newSingleThreadScheduledExecutor(); - scheduler.schedule(createThread(), 0, TimeUnit.SECONDS); + executor = Executors.newSingleThreadExecutor(); + executor.execute(createRunnable()); } public void stop() { - stopped = true; LOG.info("TimelineEntityDispatcher is draining to stop," + " ignoring any new events."); - scheduler.shutdownNow(); + executor.shutdownNow(); try { - scheduler.awaitTermination(2000l, TimeUnit.MILLISECONDS); + executor.awaitTermination(2000l, TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); }