diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5292a25..1c8137a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -671,6 +671,19 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10; + public static final String + RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BUFFER_SIZE = + RM_PREFIX + "timeline-server-v1.buffer-size"; + public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE = + RM_PREFIX + "timeline-server-v1.batch-size"; + public static final int + DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE = + 1000; + public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL = + RM_PREFIX + "timeline-server-v1.interval-seconds"; + public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL = + 60; + //RM delegation token related keys public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2cc842f..efcbfbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -898,6 +898,35 @@ + + The size of timeline server v1 publisher sending events in one request. + + yarn.resourcemanager.timeline-server-v1.batch-size + 1000 + + + + + The size of the event buffer in timeline server v1 publisher. Default value + is ${yarn.resourcemanager.timeline-server-v1.buffer-size} * + ${yarn.resourcemanager.timeline-server-v1.dispatcher.pool-size} + 1 + + yarn.resourcemanager.timeline-server-v1.buffer-size + + + + + + When enable batch publishing in timeline server v1, we must avoid that the + publisher waits for a batch to be filled up and hold events in buffer for long + time. So we add another thread which send event's in the buffer periodically. + This config sets the interval of the cyclical sending thread. + + yarn.resourcemanager.timeline-server-v1.interval-seconds + 60 + + + Number of diagnostics/failure messages can be saved in RM for log aggregation. It also defines the number of diagnostics/failure messages can be shown in log aggregation web ui. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java index dc5292b..10d6cbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; @@ -59,6 +65,15 @@ public TimelineServiceV1Publisher() { } private TimelineClient client; + private LinkedBlockingQueue entityQueue; + private ExecutorService sendEventThreadPool; + private int dispatcherPoolSize; + private int dispatcherBufferSize; + private int dispatcherBatchSize; + private int putEventInterval; + private volatile boolean stopped = false; + private PutEventThread putEventThread; + private Object sendEntityLock; @Override protected void serviceInit(Configuration conf) throws Exception { @@ -67,8 +82,52 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); getDispatcher().register(SystemMetricsEventType.class, new TimelineV1EventHandler()); + putEventInterval = + conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL, + YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL) + * 1000; + dispatcherPoolSize = conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration. + DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE); + dispatcherBatchSize = conf.getInt( + YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE, + YarnConfiguration. + DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE); + dispatcherBufferSize = conf.getInt( + YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BUFFER_SIZE, + dispatcherBatchSize * dispatcherPoolSize + 1); + if (dispatcherBufferSize < dispatcherBatchSize) { + dispatcherBufferSize = dispatcherBatchSize * dispatcherPoolSize + 1; + } + putEventThread = new PutEventThread(); + sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize); + entityQueue = new LinkedBlockingQueue<>(dispatcherBufferSize); + sendEntityLock = new Object(); + } + + protected void serviceStart() throws Exception { + putEventThread.start(); + super.serviceStart(); + } + + protected void serviceStop() throws Exception { + stopped = true; + putEventThread.interrupt(); + sendEventThreadPool.shutdown(); + try { + putEventThread.join(); + if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) { + sendEventThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + sendEventThreadPool.shutdownNow(); + } + + super.serviceStop(); } + @SuppressWarnings("unchecked") @Override public void appCreated(RMApp app, long createdTime) { @@ -244,7 +303,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt, @SuppressWarnings("unchecked") @Override public void appAttemptFinished(RMAppAttempt appAttempt, - RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + RMAppAttemptState appAttemptState, RMApp app, long finishedTime) { TimelineEntity entity = createAppAttemptEntity(appAttempt.getAppAttemptId()); @@ -261,7 +320,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt, eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO, app.getFinalApplicationStatus().toString()); eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils - .createApplicationAttemptState(appAttemtpState).toString()); + .createApplicationAttemptState(appAttemptState).toString()); if (appAttempt.getMasterContainer() != null) { eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO, appAttempt.getMasterContainer().getId().toString()); @@ -360,6 +419,7 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) { return entity; } + private void putEntity(TimelineEntity entity) { try { if (LOG.isDebugEnabled()) { @@ -367,10 +427,43 @@ private void putEntity(TimelineEntity entity) { + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } - client.putEntities(entity); + entityQueue.offer(entity); + if (entityQueue.size() > dispatcherBatchSize) { + synchronized (sendEntityLock) { + if (entityQueue.size() > dispatcherBatchSize) { + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + } + } + } } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } + + private class SendEntity implements Runnable { + + private ArrayList buffer; + + public SendEntity(){ + buffer = new ArrayList(); + entityQueue.drainTo(buffer); + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("send batch events:" + String.valueOf(buffer.size())); + } + if (buffer.isEmpty()) { + return; + } + try { + client.putEntities(buffer.toArray(new TimelineEntity[0])); + } catch (Exception e) { + LOG.error("Error when publishing entity", e); + } } } @@ -395,4 +488,38 @@ public void handle(TimelineV1PublishEvent event) { putEntity(event.getEntity()); } } + + private class PutEventThread extends Thread { + public PutEventThread() { + super("PutEventThread"); + } + + @Override + public void run() { + LOG.info("system metrics publisher will put events every " + + String.valueOf(putEventInterval) + " seconds"); + while (!stopped && !Thread.currentThread().isInterrupted()) { + if (System.currentTimeMillis() % putEventInterval >= 1000) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + continue; + } + LOG.debug("put entities by PutEventThread"); + SendEntity task = new SendEntity(); + sendEventThreadPool.submit(task); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn(SystemMetricsPublisher.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java index 5b0c34f..37b0913 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java @@ -203,6 +203,8 @@ private static YarnConfiguration getConf(boolean v1Enabled, MemoryTimelineStore.class, TimelineStore.class); yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, MemoryTimelineStateStore.class, TimelineStateStore.class); + yarnConf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL, + 1); } if (v2Enabled) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index f2d82e9..76d2482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -87,6 +87,8 @@ public static void setup() throws Exception { conf.setInt( YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); + conf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL, + 1); timelineServer = new ApplicationHistoryServer(); timelineServer.init(conf);