diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8b14ef7..53bdd00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1445,6 +1445,9 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = + TIMELINE_SERVICE_PREFIX + "writer.flush.interval.seconds"; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 4de39ec..f2281c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -19,6 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import com.google.common.annotations.VisibleForTesting; /** * Class that manages adding and removing collectors and their lifecycle. It @@ -46,8 +51,13 @@ public class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); + // flush every minute by default + private static final int DEFAULT_WRITER_FLUSH_INTERVAL_SECONDS = 60; private TimelineWriter writer; + private ScheduledExecutorService writerFlusher; + private int flushInterval; + private boolean writerFlusherRunning; @Override public void serviceInit(Configuration conf) throws Exception { @@ -56,6 +66,12 @@ public void serviceInit(Configuration conf) throws Exception { FileSystemTimelineWriterImpl.class, TimelineWriter.class), conf); writer.init(conf); + // create a single dedicated thread for flushing the writer on a periodic + // basis + writerFlusher = Executors.newSingleThreadScheduledExecutor(); + flushInterval = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS, + DEFAULT_WRITER_FLUSH_INTERVAL_SECONDS); super.serviceInit(conf); } @@ -65,6 +81,10 @@ protected void serviceStart() throws Exception { if (writer != null) { writer.start(); } + // schedule the flush task + writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer), + flushInterval, flushInterval, TimeUnit.SECONDS); + writerFlusherRunning = true; } // access to this map is synchronized with the map itself @@ -165,9 +185,40 @@ protected void serviceStop() throws Exception { c.serviceStop(); } } + // stop the flusher first + if (writerFlusher != null) { + writerFlusher.shutdownNow(); + writerFlusherRunning = false; + } if (writer != null) { writer.close(); } super.serviceStop(); } + + @VisibleForTesting + boolean writerFlusherRunning() { + return writerFlusherRunning; + } + + /** + * Task that invokes the flush operation on the timeline writer. + */ + private static class WriterFlushTask implements Runnable { + private final TimelineWriter writer; + + public WriterFlushTask(TimelineWriter writer) { + this.writer = writer; + } + + public void run() { + try { + writer.flush(); + } catch (Throwable th) { + // we need to handle all exceptions or subsequent execution may be + // suppressed + LOG.error("exception during timeline writer flush!", th); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 2fff98d..4385bbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -129,6 +129,11 @@ public void serviceStart() throws Exception { mkdirs(outputRoot, ENTITIES_DIR); } + @Override + public void flush() throws IOException { + // no op + } + private static String mkdirs(String... dirStrs) throws IOException { StringBuilder path = new StringBuilder(); for (String dirStr : dirStrs) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..6ba6a3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -53,7 +53,7 @@ TimelineWriter { private Connection conn; - private TypedBufferedMutator entityTable; + private volatile TypedBufferedMutator entityTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -214,6 +214,12 @@ public TimelineWriteResponse aggregate(TimelineEntity data, return null; } + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + } + /** * close the hbase connections The close APIs perform flushing and release any * resources held diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java index 5b4442c..381ff17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -187,6 +187,11 @@ public TimelineWriteResponse aggregate(TimelineEntity data, } + @Override + public void flush() throws IOException { + // currently no-op + } + // Utility functions @Private @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 494e8ad..50136de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -70,4 +70,13 @@ TimelineWriteResponse write(String clusterId, String userId, */ TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException; + + /** + * Flushes the data to the backend storage. Whatever may be buffered will be + * written to the storage when the method returns. This may be a potentially + * time-consuming operation, and should be used judiciously. + * + * @throws IOException + */ + void flush() throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 87343fd..0d69fbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -67,6 +67,11 @@ public void tearDown() throws Exception { } @Test + public void testStartingWriterFlusher() throws Exception { + assertTrue(collectorManager.writerFlusherRunning()); + } + + @Test public void testStartWebApp() throws Exception { assertNotNull(collectorManager.getRestServerBindAddress()); String address = collectorManager.getRestServerBindAddress();