diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index be4c023..1ecac69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -404,6 +404,17 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE = + RM_PREFIX + "system-metrics-publisher.buffer-size"; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE = + RM_PREFIX + "system-metrics-publisher.batch-size"; + public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE = + 1000; + public static final String RM_SYSTEM_METRICS_PUBLISHER_INTERVAL = + RM_PREFIX + "system-metrics-publisher.interval-seconds"; + public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL = + 60; + //RM delegation token related keys public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 881ca8e..c6cab99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -797,6 +797,35 @@ + + the size of system metrics publisher sending events in one request. + + yarn.resourcemanager.system-metrics-publisher.batch-size + 1000 + + + + + the size of the event buffer in system metrics publisher. Default value is + ${yarn.resourcemanager.system-metrics-publisher.buffer-size} * + ${yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size} + 1 + + yarn.resourcemanager.system-metrics-publisher.buffer-size + + + + + + When enable batch publishing, we must avoid that the publisher waits for a batch + to be filled up and hold events in buffer for long time. So we add another thread + which send event's in the buffer periodically. This config sets the interval of the + cyclical sending thread. + + yarn.resourcemanager.system-metrics-publisher.interval-seconds + 60 + + + Number of diagnostics/failure messages can be saved in RM for log aggregation. It also defines the number of diagnostics/failure messages can be shown in log aggregation web ui. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 5d9c6ad..5245438 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -22,6 +22,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +77,15 @@ private Dispatcher dispatcher; private TimelineClient client; private boolean publishSystemMetrics; + private LinkedBlockingQueue entityQueue; + private ExecutorService sendEventThreadPool; + private int dispatcherPoolSize; + private int dispatcherBufferSize; + private int dispatcherBatchSize; + private int putEventInterval; + private volatile boolean stopped = false; + private PutEventThread putEventThread; + private Object sendEntityLock; public SystemMetricsPublisher() { super(SystemMetricsPublisher.class.getName()); @@ -85,7 +98,24 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) && conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED); - + putEventInterval = + conf.getInt(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_INTERVAL, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL) + * 1000; + dispatcherPoolSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration. + DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE); + dispatcherBatchSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE, + YarnConfiguration. + DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE); + dispatcherBufferSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE, + dispatcherBatchSize * dispatcherPoolSize + 1); + if (dispatcherBufferSize < dispatcherBatchSize) { + dispatcherBufferSize = dispatcherBatchSize * dispatcherPoolSize + 1; + } if (publishSystemMetrics) { client = TimelineClient.createTimelineClient(); addIfService(client); @@ -95,12 +125,40 @@ protected void serviceInit(Configuration conf) throws Exception { new ForwardingEventHandler()); addIfService(dispatcher); LOG.info("YARN system metrics publishing service is enabled"); + putEventThread = new PutEventThread(); + sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize); + entityQueue = new LinkedBlockingQueue<>(dispatcherBufferSize); + sendEntityLock = new Object(); } else { LOG.info("YARN system metrics publishing service is not enabled"); } super.serviceInit(conf); } + protected void serviceStart() throws Exception { + if (publishSystemMetrics) { + putEventThread.start(); + } + super.serviceStart(); + } + + protected void serviceStop() throws Exception { + if (publishSystemMetrics) { + stopped = true; + putEventThread.interrupt(); + sendEventThreadPool.shutdown(); + try { + putEventThread.join(); + if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) { + sendEventThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + sendEventThreadPool.shutdownNow(); + } + } + super.serviceStop(); + } + @SuppressWarnings("unchecked") public void appCreated(RMApp app, long createdTime) { if (publishSystemMetrics) { @@ -194,7 +252,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt, @SuppressWarnings("unchecked") public void appAttemptFinished(RMAppAttempt appAttempt, - RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + RMAppAttemptState appAttemptState, RMApp app, long finishedTime) { if (publishSystemMetrics) { ContainerId container = (appAttempt.getMasterContainer() == null) ? null : appAttempt.getMasterContainer().getId(); @@ -207,7 +265,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt, // app will get the final status from app attempt, or create one // based on app state if it doesn't exist app.getFinalApplicationStatus(), - RMServerUtils.createApplicationAttemptState(appAttemtpState), + RMServerUtils.createApplicationAttemptState(appAttemptState), finishedTime, container)); } @@ -241,10 +299,7 @@ public void containerFinished(RMContainer container, long finishedTime) { protected Dispatcher createDispatcher(Configuration conf) { MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + new MultiThreadedDispatcher(dispatcherPoolSize); dispatcher.setDrainEventsOnStop(); return dispatcher; } @@ -548,14 +603,42 @@ private static TimelineEntity createContainerEntity( @Private @VisibleForTesting public void putEntity(TimelineEntity entity) { - try { + entityQueue.offer(entity); + if (entityQueue.size() > dispatcherBatchSize) { + synchronized (sendEntityLock) { + if (entityQueue.size() > dispatcherBatchSize) { + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + } + } + } + } + + private class SendEntity implements Runnable { + + private ArrayList buffer; + + public SendEntity(){ + buffer = new ArrayList(); + entityQueue.drainTo(buffer); + } + + @Override + public void run() { if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + LOG.debug("send batch events:" + String.valueOf(buffer.size())); + } + if (buffer.isEmpty()) { + return; + } + TimelinePutResponse response = null; + try { + response = client.putEntities(buffer.toArray(new TimelineEntity[0])); + } catch (Exception e) { + LOG.error("Error when publishing entity", e); } - TimelinePutResponse response = client.putEntities(entity); List errors = response.getErrors(); - if (errors.size() == 0) { + if (errors == null || errors.size() == 0) { LOG.debug("Timeline entities are successfully put"); } else { for (TimelinePutResponse.TimelinePutError error : errors) { @@ -565,9 +648,6 @@ public void putEntity(TimelineEntity entity) { + error.getErrorCode()); } } - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); } } @@ -640,4 +720,37 @@ protected AsyncDispatcher createDispatcher() { } + private class PutEventThread extends Thread { + public PutEventThread() { + super("PutEventThread"); + } + + @Override + public void run() { + LOG.info("system metrics publisher will put events every " + + String.valueOf(putEventInterval) + " seconds"); + while (!stopped && !Thread.currentThread().isInterrupted()) { + if (System.currentTimeMillis() % putEventInterval >= 1000) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + continue; + } + LOG.debug("put entities by PutEventThread"); + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } }