Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java (date 1512094689000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java (revision ) @@ -22,6 +22,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +77,14 @@ private Dispatcher dispatcher; private TimelineClient client; private boolean publishSystemMetrics; + private LinkedBlockingQueue entityQueue; + private ExecutorService sendEventThreadPool; + private int dispatcherPoolSize; + private int dispatcherBufferSize; + private int dispatcherBatchSize; + private int putEventInterval; + private volatile boolean stopped = false; + private PutEventThread putEventThread; public SystemMetricsPublisher() { super(SystemMetricsPublisher.class.getName()); @@ -85,7 +97,21 @@ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) && conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED); - + putEventInterval = + conf.getInt(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_INTERVAL, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL) * 1000; + dispatcherPoolSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE); + dispatcherBatchSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE); + dispatcherBufferSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE, + dispatcherBatchSize * dispatcherPoolSize + 1); + if (dispatcherBufferSize < dispatcherBatchSize) { + dispatcherBufferSize = dispatcherBatchSize * dispatcherPoolSize + 1; + } if (publishSystemMetrics) { client = TimelineClient.createTimelineClient(); addIfService(client); @@ -95,12 +121,35 @@ new ForwardingEventHandler()); addIfService(dispatcher); LOG.info("YARN system metrics publishing service is enabled"); + putEventThread = new PutEventThread(); + sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize); + entityQueue = new LinkedBlockingQueue<>(dispatcherBufferSize); } else { LOG.info("YARN system metrics publishing service is not enabled"); } super.serviceInit(conf); } + protected void serviceStart() throws Exception { + putEventThread.start(); + super.serviceStart(); + } + + protected void serviceStop() throws Exception { + stopped = true; + putEventThread.interrupt(); + sendEventThreadPool.shutdown(); + try { + putEventThread.join(); + if(!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) { + sendEventThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + sendEventThreadPool.shutdownNow(); + } + super.serviceStop(); + } + @SuppressWarnings("unchecked") public void appCreated(RMApp app, long createdTime) { if (publishSystemMetrics) { @@ -194,7 +243,7 @@ @SuppressWarnings("unchecked") public void appAttemptFinished(RMAppAttempt appAttempt, - RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + RMAppAttemptState appAttemptState, RMApp app, long finishedTime) { if (publishSystemMetrics) { ContainerId container = (appAttempt.getMasterContainer() == null) ? null : appAttempt.getMasterContainer().getId(); @@ -207,7 +256,7 @@ // app will get the final status from app attempt, or create one // based on app state if it doesn't exist app.getFinalApplicationStatus(), - RMServerUtils.createApplicationAttemptState(appAttemtpState), + RMServerUtils.createApplicationAttemptState(appAttemptState), finishedTime, container)); } @@ -241,10 +290,7 @@ protected Dispatcher createDispatcher(Configuration conf) { MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + new MultiThreadedDispatcher(dispatcherPoolSize); dispatcher.setDrainEventsOnStop(); return dispatcher; } @@ -548,12 +594,40 @@ @Private @VisibleForTesting public void putEntity(TimelineEntity entity) { - try { + entityQueue.offer(entity); + if (entityQueue.size() > dispatcherBatchSize) { + synchronized (entityQueue) { + if (entityQueue.size() > dispatcherBatchSize) { + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + } + } + } + } + + public class SendEntity implements Runnable { + + private ArrayList buffer; + + public SendEntity(){ + buffer = new ArrayList(); + entityQueue.drainTo(buffer); + } + + @Override + public void run() { if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + LOG.debug("send batch events:" + String.valueOf(buffer.size())); } - TimelinePutResponse response = client.putEntities(entity); + if (buffer.isEmpty()) { + return; + } + TimelinePutResponse response = null; + try { + response = client.putEntities(buffer.toArray(new TimelineEntity[0])); + } catch (Exception e) { + LOG.error("Error when publishing entity", e); + } List errors = response.getErrors(); if (errors.size() == 0) { LOG.debug("Timeline entities are successfully put"); @@ -565,9 +639,6 @@ + error.getErrorCode()); } } - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); } } @@ -640,4 +711,36 @@ } + private class PutEventThread extends Thread { + public PutEventThread() { + super("PutEventThread"); + } + + @Override + public void run() { + LOG.info("system metrics publisher will put events every " + String.valueOf(putEventInterval) + " seconds"); + while (!stopped && !Thread.currentThread().isInterrupted()) { + if (System.currentTimeMillis() % putEventInterval >= 1000) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + continue; + } + LOG.debug("put entities by PutEventThread"); + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (date 1512094689000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision ) @@ -404,6 +404,17 @@ public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE = + RM_PREFIX + "system-metrics-publisher.buffer-size"; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE = + RM_PREFIX + "system-metrics-publisher.batch-size"; + public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE = + 1000; + public static final String RM_SYSTEM_METRICS_PUBLISHER_INTERVAL = + RM_PREFIX + "system-metrics-publisher.interval-seconds"; + public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL = + 60; + //RM delegation token related keys public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval";