diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5292a25..1c8137a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -671,6 +671,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
+ public static final String
+ RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BUFFER_SIZE =
+ RM_PREFIX + "timeline-server-v1.buffer-size";
+ public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ RM_PREFIX + "timeline-server-v1.batch-size";
+ public static final int
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ 1000;
+ public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+ RM_PREFIX + "timeline-server-v1.interval-seconds";
+ public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+ 60;
+
//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2cc842f..efcbfbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -898,6 +898,35 @@
+
+ The size of timeline server v1 publisher sending events in one request.
+
+ yarn.resourcemanager.timeline-server-v1.batch-size
+ 1000
+
+
+
+
+ The size of the event buffer in timeline server v1 publisher. Default value
+ is ${yarn.resourcemanager.timeline-server-v1.buffer-size} *
+ ${yarn.resourcemanager.timeline-server-v1.dispatcher.pool-size} + 1
+
+ yarn.resourcemanager.timeline-server-v1.buffer-size
+
+
+
+
+
+ When enable batch publishing in timeline server v1, we must avoid that the
+ publisher waits for a batch to be filled up and hold events in buffer for long
+ time. So we add another thread which send event's in the buffer periodically.
+ This config sets the interval of the cyclical sending thread.
+
+ yarn.resourcemanager.timeline-server-v1.interval-seconds
+ 60
+
+
+
Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure
messages can be shown in log aggregation web ui.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index dc5292b..10d6cbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -18,8 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
@@ -59,6 +65,15 @@ public TimelineServiceV1Publisher() {
}
private TimelineClient client;
+ private LinkedBlockingQueue entityQueue;
+ private ExecutorService sendEventThreadPool;
+ private int dispatcherPoolSize;
+ private int dispatcherBufferSize;
+ private int dispatcherBatchSize;
+ private int putEventInterval;
+ private volatile boolean stopped = false;
+ private PutEventThread putEventThread;
+ private Object sendEntityLock;
@Override
protected void serviceInit(Configuration conf) throws Exception {
@@ -67,8 +82,52 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
getDispatcher().register(SystemMetricsEventType.class,
new TimelineV1EventHandler());
+ putEventInterval =
+ conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+ YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+ * 1000;
+ dispatcherPoolSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+ dispatcherBatchSize = conf.getInt(
+ YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
+ dispatcherBufferSize = conf.getInt(
+ YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BUFFER_SIZE,
+ dispatcherBatchSize * dispatcherPoolSize + 1);
+ if (dispatcherBufferSize < dispatcherBatchSize) {
+ dispatcherBufferSize = dispatcherBatchSize * dispatcherPoolSize + 1;
+ }
+ putEventThread = new PutEventThread();
+ sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+ entityQueue = new LinkedBlockingQueue<>(dispatcherBufferSize);
+ sendEntityLock = new Object();
+ }
+
+ protected void serviceStart() throws Exception {
+ putEventThread.start();
+ super.serviceStart();
+ }
+
+ protected void serviceStop() throws Exception {
+ stopped = true;
+ putEventThread.interrupt();
+ sendEventThreadPool.shutdown();
+ try {
+ putEventThread.join();
+ if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
+ sendEventThreadPool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ sendEventThreadPool.shutdownNow();
+ }
+
+ super.serviceStop();
}
+
@SuppressWarnings("unchecked")
@Override
public void appCreated(RMApp app, long createdTime) {
@@ -244,7 +303,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
@SuppressWarnings("unchecked")
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
- RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+ RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
@@ -261,7 +320,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
- .createApplicationAttemptState(appAttemtpState).toString());
+ .createApplicationAttemptState(appAttemptState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
@@ -360,6 +419,7 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) {
return entity;
}
+
private void putEntity(TimelineEntity entity) {
try {
if (LOG.isDebugEnabled()) {
@@ -367,10 +427,43 @@ private void putEntity(TimelineEntity entity) {
+ ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
- client.putEntities(entity);
+ entityQueue.offer(entity);
+ if (entityQueue.size() > dispatcherBatchSize) {
+ synchronized (sendEntityLock) {
+ if (entityQueue.size() > dispatcherBatchSize) {
+ SendEntity task = new SendEntity();
+ sendEventThreadPool.submit(task);
+ }
+ }
+ }
} catch (Exception e) {
- LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
- + entity.getEntityId() + "]", e);
+ LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ + entity.getEntityId() + "]", e);
+ }
+ }
+
+ private class SendEntity implements Runnable {
+
+ private ArrayList buffer;
+
+ public SendEntity(){
+ buffer = new ArrayList();
+ entityQueue.drainTo(buffer);
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send batch events:" + String.valueOf(buffer.size()));
+ }
+ if (buffer.isEmpty()) {
+ return;
+ }
+ try {
+ client.putEntities(buffer.toArray(new TimelineEntity[0]));
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity", e);
+ }
}
}
@@ -395,4 +488,38 @@ public void handle(TimelineV1PublishEvent event) {
putEntity(event.getEntity());
}
}
+
+ private class PutEventThread extends Thread {
+ public PutEventThread() {
+ super("PutEventThread");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("system metrics publisher will put events every " +
+ String.valueOf(putEventInterval) + " seconds");
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ if (System.currentTimeMillis() % putEventInterval >= 1000) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ continue;
+ }
+ LOG.debug("put entities by PutEventThread");
+ SendEntity task = new SendEntity();
+ sendEventThreadPool.submit(task);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 4add186..f4dbabf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -551,7 +551,8 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
getConf().set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-
+ getConf().setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+ 1);
MockRM rm1 = new MockRM(getConf());
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
rm1.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
index 5b0c34f..37b0913 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
@@ -203,6 +203,8 @@ private static YarnConfiguration getConf(boolean v1Enabled,
MemoryTimelineStore.class, TimelineStore.class);
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
+ yarnConf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+ 1);
}
if (v2Enabled) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index f2d82e9..76d2482 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -87,6 +87,8 @@ public static void setup() throws Exception {
conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
2);
+ conf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+ 1);
timelineServer = new ApplicationHistoryServer();
timelineServer.init(conf);