diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index be4c023..1ecac69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -404,6 +404,17 @@ private static void addDeprecatedKeys() {
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
10;
+ public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE =
+ RM_PREFIX + "system-metrics-publisher.buffer-size";
+ public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ RM_PREFIX + "system-metrics-publisher.batch-size";
+ public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ 1000;
+ public static final String RM_SYSTEM_METRICS_PUBLISHER_INTERVAL =
+ RM_PREFIX + "system-metrics-publisher.interval-seconds";
+ public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL =
+ 60;
+
//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 881ca8e..c6cab99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -797,6 +797,35 @@
+
+ the size of system metrics publisher sending events in one request.
+
+ yarn.resourcemanager.system-metrics-publisher.batch-size
+ 1000
+
+
+
+
+ the size of the event buffer in system metrics publisher. Default value is
+ ${yarn.resourcemanager.system-metrics-publisher.buffer-size} *
+ ${yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size} + 1
+
+ yarn.resourcemanager.system-metrics-publisher.buffer-size
+
+
+
+
+
+ When enable batch publishing, we must avoid that the publisher waits for a batch
+ to be filled up and hold events in buffer for long time. So we add another thread
+ which send event's in the buffer periodically. This config sets the interval of the
+ cyclical sending thread.
+
+ yarn.resourcemanager.system-metrics-publisher.interval-seconds
+ 60
+
+
+
Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure
messages can be shown in log aggregation web ui.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 5d9c6ad..5245438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -22,6 +22,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,6 +77,15 @@
private Dispatcher dispatcher;
private TimelineClient client;
private boolean publishSystemMetrics;
+ private LinkedBlockingQueue entityQueue;
+ private ExecutorService sendEventThreadPool;
+ private int dispatcherPoolSize;
+ private int dispatcherBufferSize;
+ private int dispatcherBatchSize;
+ private int putEventInterval;
+ private volatile boolean stopped = false;
+ private PutEventThread putEventThread;
+ private Object sendEntityLock;
public SystemMetricsPublisher() {
super(SystemMetricsPublisher.class.getName());
@@ -85,7 +98,24 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
+ putEventInterval =
+ conf.getInt(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_INTERVAL,
+ YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_INTERVAL)
+ * 1000;
+ dispatcherPoolSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+ dispatcherBatchSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BATCH_SIZE);
+ dispatcherBufferSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_BUFFER_SIZE,
+ dispatcherBatchSize * dispatcherPoolSize + 1);
+ if (dispatcherBufferSize < dispatcherBatchSize) {
+ dispatcherBufferSize = dispatcherBatchSize * dispatcherPoolSize + 1;
+ }
if (publishSystemMetrics) {
client = TimelineClient.createTimelineClient();
addIfService(client);
@@ -95,12 +125,40 @@ protected void serviceInit(Configuration conf) throws Exception {
new ForwardingEventHandler());
addIfService(dispatcher);
LOG.info("YARN system metrics publishing service is enabled");
+ putEventThread = new PutEventThread();
+ sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+ entityQueue = new LinkedBlockingQueue<>(dispatcherBufferSize);
+ sendEntityLock = new Object();
} else {
LOG.info("YARN system metrics publishing service is not enabled");
}
super.serviceInit(conf);
}
+ protected void serviceStart() throws Exception {
+ if (publishSystemMetrics) {
+ putEventThread.start();
+ }
+ super.serviceStart();
+ }
+
+ protected void serviceStop() throws Exception {
+ if (publishSystemMetrics) {
+ stopped = true;
+ putEventThread.interrupt();
+ sendEventThreadPool.shutdown();
+ try {
+ putEventThread.join();
+ if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
+ sendEventThreadPool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ sendEventThreadPool.shutdownNow();
+ }
+ }
+ super.serviceStop();
+ }
+
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetrics) {
@@ -194,7 +252,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
- RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+ RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
if (publishSystemMetrics) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId();
@@ -207,7 +265,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
// app will get the final status from app attempt, or create one
// based on app state if it doesn't exist
app.getFinalApplicationStatus(),
- RMServerUtils.createApplicationAttemptState(appAttemtpState),
+ RMServerUtils.createApplicationAttemptState(appAttemptState),
finishedTime,
container));
}
@@ -241,10 +299,7 @@ public void containerFinished(RMContainer container, long finishedTime) {
protected Dispatcher createDispatcher(Configuration conf) {
MultiThreadedDispatcher dispatcher =
- new MultiThreadedDispatcher(
- conf.getInt(
- YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
- YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+ new MultiThreadedDispatcher(dispatcherPoolSize);
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
@@ -548,14 +603,42 @@ private static TimelineEntity createContainerEntity(
@Private
@VisibleForTesting
public void putEntity(TimelineEntity entity) {
- try {
+ entityQueue.offer(entity);
+ if (entityQueue.size() > dispatcherBatchSize) {
+ synchronized (sendEntityLock) {
+ if (entityQueue.size() > dispatcherBatchSize) {
+ SendEntity task = new SendEntity();
+ sendEventThreadPool.submit(task);
+ }
+ }
+ }
+ }
+
+ private class SendEntity implements Runnable {
+
+ private ArrayList buffer;
+
+ public SendEntity(){
+ buffer = new ArrayList();
+ entityQueue.drainTo(buffer);
+ }
+
+ @Override
+ public void run() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing the entity " + entity.getEntityId() +
- ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ LOG.debug("send batch events:" + String.valueOf(buffer.size()));
+ }
+ if (buffer.isEmpty()) {
+ return;
+ }
+ TimelinePutResponse response = null;
+ try {
+ response = client.putEntities(buffer.toArray(new TimelineEntity[0]));
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity", e);
}
- TimelinePutResponse response = client.putEntities(entity);
List errors = response.getErrors();
- if (errors.size() == 0) {
+ if (errors == null || errors.size() == 0) {
LOG.debug("Timeline entities are successfully put");
} else {
for (TimelinePutResponse.TimelinePutError error : errors) {
@@ -565,9 +648,6 @@ public void putEntity(TimelineEntity entity) {
+ error.getErrorCode());
}
}
- } catch (Exception e) {
- LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
- + entity.getEntityId() + "]", e);
}
}
@@ -640,4 +720,37 @@ protected AsyncDispatcher createDispatcher() {
}
+ private class PutEventThread extends Thread {
+ public PutEventThread() {
+ super("PutEventThread");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("system metrics publisher will put events every " +
+ String.valueOf(putEventInterval) + " seconds");
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ if (System.currentTimeMillis() % putEventInterval >= 1000) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ continue;
+ }
+ LOG.debug("put entities by PutEventThread");
+ SendEntity task = new SendEntity();
+ sendEventThreadPool.submit(task);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
}