From b61c0be7ea24ae640541a0cbe38ba1c5640c0ca1 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Thu, 26 Oct 2017 20:17:01 +0530 Subject: [PATCH] YARN-7272 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 13 + .../hadoop-yarn-server-timelineservice/pom.xml | 5 + .../AppLevelTimelineCollectorWithAgg.java | 48 ++- .../collector/NodeTimelineCollectorManager.java | 20 +- .../collector/TimelineCollector.java | 30 +- .../collector/TimelineCollectorManager.java | 30 +- .../collector/TimelineCollectorWebService.java | 2 +- .../recovery/FileSystemWALstore.java | 352 +++++++++++++++++++++ .../timelineservice/recovery/InMemoryWALstore.java | 45 +++ .../timelineservice/recovery/TimelineWALstore.java | 20 ++ .../collector/TestNMTimelineCollectorManager.java | 4 + .../TestPerNodeTimelineCollectorsAuxService.java | 6 +- .../collector/TestTimelineCollector.java | 2 +- .../recovery/TestFileSystemWALStore.java | 115 +++++++ 14 files changed, 665 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/FileSystemWALstore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/InMemoryWALstore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TimelineWALstore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TestFileSystemWALStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0d5f2cbacaf..9ac6bf7cc84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2315,6 +2315,19 @@ public static boolean isAclEnabled(Configuration conf) { "org.apache.hadoop.yarn.server.timelineservice" + ".storage.HBaseTimelineWriterImpl"; + public static final String TIMELINE_SERVICE_WAL_STORE_CLASS = + TIMELINE_SERVICE_PREFIX + "wal-store.class"; + + public static final String DEFAULT_TIMELINE_SERVICE_WAL_STORE_CLASS = + "org.apache.hadoop.yarn.server.timelineservice.recovery" + + ".FileSystemWALstore"; + + public static final String TIMELINE_SERVICE_FS_WAL_STORE_BASE_PATH = + TIMELINE_SERVICE_PREFIX + "fs-wal-store.base-path"; + + public static final String DEFAULT_TIMELINE_SERVICE_FS_WAL_STORE_BASE_PATH = + "/tmp/entity-file-wal"; + public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 5cbfbf59921..11d0e26c1a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -42,6 +42,11 @@ org.apache.hadoop + hadoop-hdfs-client + + + + org.apache.hadoop hadoop-annotations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java index d7f47c894e3..c56d9c97ce2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java @@ -25,9 +25,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.timelineservice.recovery.TimelineWALstore; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import java.util.HashSet; import java.util.Map; @@ -72,23 +77,24 @@ public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + walStore = createTimelineWALstore(conf); + walStore.setTimelineCollectorContext(getTimelineEntityContext()); + walStore.init(conf); } @Override protected void serviceStart() throws Exception { + walStore.start(); // Launch the aggregation thread appAggregationExecutor = new ScheduledThreadPoolExecutor( - AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS, - new ThreadFactoryBuilder() - .setNameFormat("TimelineCollector Aggregation thread #%d") - .build()); + AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS, + new ThreadFactoryBuilder() + .setNameFormat("TimelineCollector Aggregation thread #%d").build()); appAggregator = new AppLevelAggregator(); appAggregationExecutor.scheduleAtFixedRate(appAggregator, - AppLevelTimelineCollectorWithAgg. - AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, - AppLevelTimelineCollectorWithAgg. - AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, - TimeUnit.SECONDS); + AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + TimeUnit.SECONDS); super.serviceStart(); } @@ -101,6 +107,7 @@ protected void serviceStop() throws Exception { } // Perform one round of aggregation after the aggregation executor is done. appAggregator.aggregate(); + walStore.stop(); super.serviceStop(); } @@ -131,7 +138,7 @@ private void aggregate() { TimelineEntityType.YARN_APPLICATION.toString()); TimelineEntities entities = new TimelineEntities(); entities.addEntity(resultEntity); - putEntitiesAsync(entities, getCurrentUser()); + putEntitiesAsync(entities, getCurrentUser(), false); } catch (Exception e) { LOG.error("Error aggregating timeline metrics", e); } @@ -144,4 +151,25 @@ public void run() { } } + private TimelineWALstore createTimelineWALstore(final Configuration conf) { + String timelineWriterClassName = + conf.get(YarnConfiguration.TIMELINE_SERVICE_WAL_STORE_CLASS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WAL_STORE_CLASS); + LOG.info("Using TimelineWALstore: " + timelineWriterClassName); + try { + Class timelineWALstoreClazz = Class.forName(timelineWriterClassName); + if (TimelineWALstore.class.isAssignableFrom(timelineWALstoreClazz)) { + return (TimelineWALstore) ReflectionUtils + .newInstance(timelineWALstoreClazz, conf); + } else { + throw new YarnRuntimeException("Class: " + timelineWriterClassName + + " not instance of " + TimelineWriter.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate TimelineWALstore: " + timelineWriterClassName, + e); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 68a68f0ded5..d9f47c92f6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -237,6 +237,20 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { // Get context info from NM updateTimelineCollectorContext(appId, collector); + + if (collector.getWalStore() != null) { + try { + collector.putEntitiesAsync(collector.getWalStore().recoverEntities(), + UserGroupInformation.createRemoteUser( + collector.getTimelineEntityContext().getAppId()), + true); + } catch (IOException e) { + LOG.warn("Entity recovery has failed! " + e); + } + // update walStore + writerFlushTask.register(appId, collector.getWalStore()); + } + // Generate token for app collector. org.apache.hadoop.yarn.api.records.Token token = null; if (UserGroupInformation.isSecurityEnabled() && @@ -260,10 +274,12 @@ protected void postRemove(ApplicationId appId, TimelineCollector collector) { try { cancelTokenForAppCollector((AppLevelTimelineCollector) collector); } catch (IOException e) { - LOG.warn("Failed to cancel token for app collector with appId " + - appId, e); + LOG.warn("Failed to cancel token for app collector with appId " + appId, + e); } } + + writerFlushTask.unregister(appId); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8202431459d..10061ef24c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.recovery.TimelineWALstore; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public static final String SEPARATOR = "_"; private TimelineWriter writer; + private ConcurrentMap aggregationGroups = new ConcurrentHashMap<>(); private static Set entityTypesSkipAggregation @@ -65,6 +67,8 @@ private volatile boolean isStopped = false; + protected TimelineWALstore walStore; + public TimelineCollector(String name) { super(name); } @@ -93,6 +97,10 @@ protected void setWriter(TimelineWriter w) { this.writer = w; } + public TimelineWALstore getWalStore() { + return this.walStore; + } + protected Map getAggregationGroups() { return aggregationGroups; } @@ -126,7 +134,7 @@ protected boolean isReadyToAggregate() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation, boolean)}. * * @param entities entities to post * @param callerUgi the caller UGI @@ -136,10 +144,10 @@ protected boolean isReadyToAggregate() { */ public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { - if (LOG.isDebugEnabled()) { + // if (LOG.isDebugEnabled()) { LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); - } + // } TimelineWriteResponse response; // synchronize on the writer object so that no other threads can @@ -149,7 +157,6 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, response = writeTimelineEntities(entities, callerUgi); flushBufferedTimelineEntities(); } - return response; } @@ -183,17 +190,22 @@ private void flushBufferedTimelineEntities() throws IOException { * * @param entities entities to post * @param callerUgi the caller UGI - * @throws IOException if there is any exception encounted while putting + * @param isRecovering the entities + * @throws IOException if there is any exception encountered while putting * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { - if (LOG.isDebugEnabled()) { + UserGroupInformation callerUgi, boolean isRecovering) throws IOException { + // if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); + // } + synchronized (writer) { + if (!isRecovering && walStore != null) { + walStore.storeEntities(entities, callerUgi); + } + writeTimelineEntities(entities, callerUgi); } - - writeTimelineEntities(entities, callerUgi); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 7909a2e82f6..c563a1defc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -21,7 +21,9 @@ import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.timelineservice.recovery.TimelineWALstore; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import com.google.common.annotations.VisibleForTesting; @@ -56,6 +59,8 @@ private int flushInterval; private boolean writerFlusherRunning; + protected WriterFlushTask writerFlushTask; + @Override protected void serviceInit(Configuration conf) throws Exception { writer = createTimelineWriter(conf); @@ -97,8 +102,9 @@ protected void serviceStart() throws Exception { if (writer != null) { writer.start(); } + writerFlushTask = new WriterFlushTask(writer); // schedule the flush task - writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer), + writerFlusher.scheduleAtFixedRate(writerFlushTask, flushInterval, flushInterval, TimeUnit.SECONDS); writerFlusherRunning = true; } @@ -254,10 +260,12 @@ boolean writerFlusherRunning() { /** * Task that invokes the flush operation on the timeline writer. */ - private static class WriterFlushTask implements Runnable { + protected static class WriterFlushTask implements Runnable { private final TimelineWriter writer; + private ConcurrentHashMap stores = + new ConcurrentHashMap<>(); - public WriterFlushTask(TimelineWriter writer) { + WriterFlushTask(TimelineWriter writer) { this.writer = writer; } @@ -268,6 +276,12 @@ public void run() { // requests. synchronized (writer) { writer.flush(); + for (Iterator iterator = + stores.values().iterator(); iterator.hasNext();) { + TimelineWALstore store = iterator.next(); + LOG.info("Async flusher task"); + store.flush(); + } } } catch (Throwable th) { // we need to handle all exceptions or subsequent execution may be @@ -275,5 +289,15 @@ public void run() { LOG.error("exception during timeline writer flush!", th); } } + + public void register(ApplicationId id, TimelineWALstore store) { + LOG.info("Registering " + id + " walStore for flush"); + stores.put(id, store); + } + + public void unregister(ApplicationId id) { + LOG.info("Un registering " + id + " walStore from flush"); + stores.remove(id); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index efb5d6bf04c..afc632250af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -169,7 +169,7 @@ public Response putEntities( boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); if (isAsync) { collector.putEntitiesAsync( - processTimelineEntities(entities), callerUgi); + processTimelineEntities(entities), callerUgi, false); } else { collector.putEntities(processTimelineEntities(entities), callerUgi); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/FileSystemWALstore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/FileSystemWALstore.java new file mode 100644 index 00000000000..d753a4af880 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/FileSystemWALstore.java @@ -0,0 +1,352 @@ +package org.apache.hadoop.yarn.server.timelineservice.recovery; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.google.common.annotations.VisibleForTesting; + +public class FileSystemWALstore extends AbstractService + implements TimelineWALstore { + + private static final Logger LOG = + LoggerFactory.getLogger(FileSystemWALstore.class); + + private FileSystem fs = null; + private ObjectMapper objMapper = null; + // Only created user can access it + private static final FsPermission BASE_DIR_PERMISSION = + new FsPermission((short) 01777); + private static final short APP_LOG_DIR_PERMISSIONS = 0770; + private static final short FILE_LOG_PERMISSIONS = 0600; + private static final String ENTITY_LOG_PREFIX = "entitylog-"; + private static final String BACKUP_LOG_SUFFIX = ".dot"; + private Path basePath = null; + private Path logPathRoot = null; + private Path entityLogPath = null; + private TimelineCollectorContext context; + private LogFD entityLogFd; + + @VisibleForTesting + private boolean deleteLogPathRoot = true; + + public FileSystemWALstore() { + super(FileSystemWALstore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Configuration fsConf = new Configuration(conf); + this.basePath = new Path( + fsConf.get(YarnConfiguration.TIMELINE_SERVICE_FS_WAL_STORE_BASE_PATH, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_FS_WAL_STORE_BASE_PATH)); + this.fs = FileSystem.newInstance(basePath.toUri(), fsConf); + this.objMapper = createObjectMapper(); + + if (!fs.exists(basePath)) { + fs.mkdirs(basePath); + fs.setPermission(basePath, BASE_DIR_PERMISSION); + } + + logPathRoot = new Path(basePath, context.getAppId()); + if (FileSystem.mkdirs(fs, logPathRoot, + new FsPermission(APP_LOG_DIR_PERMISSIONS))) { + if (LOG.isDebugEnabled()) { + LOG.debug("New app directory created - " + logPathRoot); + } + } + + entityLogPath = + new Path(logPathRoot, ENTITY_LOG_PREFIX + context.getAppId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Writing entity log for " + context.getAppId() + " to " + + entityLogPath); + } + entityLogFd = new LogFD(fs, entityLogPath, objMapper); + } + + @Override + protected synchronized void serviceStop() throws Exception { + entityLogFd.lock(); + try { + entityLogFd.close(); + } finally { + entityLogFd.unlock(); + } + + if (deleteLogPathRoot) { + LOG.info("Deleting " + logPathRoot); + if (!fs.delete(logPathRoot, true)) { + LOG.error("Unable to remove " + logPathRoot); + } + } + IOUtils.cleanupWithLogger(LOG, fs); + } + + @Override + public void storeEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + // TODO store callerUgi inside entity info so that it can be recovered + entityLogFd.lock(); + try { + LOG.info( + "Storing entities into wal. Size is " + entities.getEntities().size()); + entityLogFd.writeEntities(entities.getEntities()); + entityLogFd.flush(); + } finally { + entityLogFd.unlock(); + } + } + + @Override + public TimelineEntities recoverEntities() throws IOException { + LOG.info("Recovering entities from FS wal."); + entityLogFd.lock(); + try { + return entityLogFd.readEntities(); + } finally { + entityLogFd.unlock(); + } + } + + private ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector( + new JaxbAnnotationIntrospector(TypeFactory.defaultInstance())); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + mapper.configure(SerializationFeature.FLUSH_AFTER_WRITE_VALUE, false); + return mapper; + } + + private static class LogFD { + private FSDataOutputStream outStream; + private FSDataInputStream inStream; + private ObjectMapper objMapper; + private JsonGenerator jsonGenerator; + private JsonParser jsonParser; + private final ReentrantLock fdLock = new ReentrantLock(); + private final FileSystem fs; + private final Path logPath; + + public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper) + throws IOException { + this.fs = fs; + this.logPath = logPath; + this.objMapper = objMapper; + } + + public boolean isEmpty() throws IOException { + try { + FileStatus fileStatus = fs.getFileStatus(logPath); + return fileStatus.getLen() == 0; + } catch (FileNotFoundException e) { + LOG.info("File not created yet!" + e.getMessage()); + } + return true; + } + + public void close() { + readClose(); + writeClose(); + } + + public void writeClose() { + IOUtils.cleanupWithLogger(LOG, jsonGenerator, outStream); + jsonGenerator = null; + outStream = null; + } + + public void readClose() { + IOUtils.cleanupWithLogger(LOG, jsonParser, inStream); + jsonParser = null; + inStream = null; + } + + public void flush() throws IOException { + LOG.info("Flushing outstreams"); + if (jsonGenerator != null) { + jsonGenerator.flush(); + } + if (outStream != null) { + outStream.hflush(); + } + } + + protected void prepareForWrite() throws IOException { + if (fs instanceof DistributedFileSystem) { + boolean recovered = false; + try { + LOG.info("Recovering the lease!"); + int retryCount = 0; + recovered = ((DistributedFileSystem) fs).recoverLease(logPath); + while (!recovered && retryCount++ < 10) { + recovered = ((DistributedFileSystem) fs).isFileClosed(logPath); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.error("Interrupted!"); + } + } + } catch (IOException e) { + if (e.getMessage().contains("File does not exist")) { + recovered = true; + } + } + if (!recovered) { + throw new IOException("unable to recover lease for " + logPath); + } + LOG.info("Recoverd lease!"); + } + + this.outStream = createLogFileStream(logPath); + this.jsonGenerator = new JsonFactory().createGenerator(outStream); + this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + } + + protected void prepareForRead() throws IOException { + this.inStream = fs.open(logPath); + jsonParser = new JsonFactory().createParser(inStream); + jsonParser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + } + + protected boolean isReadClosed() { + return inStream == null; + } + + protected boolean isWriterClosed() { + return outStream == null; + } + + private FSDataOutputStream createLogFileStream(Path logPathToCreate) + throws IOException { + FSDataOutputStream streamToCreate; + if (!fs.exists(logPathToCreate)) { + streamToCreate = fs.create(logPathToCreate, false); + fs.setPermission(logPathToCreate, + new FsPermission(FILE_LOG_PERMISSIONS)); + } else { + streamToCreate = fs.append(logPathToCreate); + } + return streamToCreate; + } + + // rename from logPath to logPath.dot + protected boolean rename() throws IOException { + return fs.rename(logPath, new Path(logPath + BACKUP_LOG_SUFFIX)); + } + + public void lock() { + this.fdLock.lock(); + } + + public void unlock() { + this.fdLock.unlock(); + } + + protected JsonGenerator getJsonGenerator() { + return jsonGenerator; + } + + protected JsonParser getJsonParser() { + return jsonParser; + } + + protected ObjectMapper getObjectMapper() { + return objMapper; + } + + public void writeEntities(List entities) + throws IOException { + if (isWriterClosed()) { + prepareForWrite(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Writing entity list of size " + entities.size()); + } + for (TimelineEntity entity : entities) { + getObjectMapper().writeValue(getJsonGenerator(), entity); + } + } + + // close the read since it is one time operation + public TimelineEntities readEntities() throws IOException { + try { + TimelineEntities entities = new TimelineEntities(); + if (isEmpty()) { + return entities; + } + if (isReadClosed()) { + prepareForRead(); + } + + ArrayList entityList = new ArrayList(); + MappingIterator iter = + getObjectMapper().readValues(getJsonParser(), TimelineEntity.class); + while (iter.hasNext()) { + TimelineEntity entity = iter.next(); + entityList.add(entity); + } + entities.setEntities(entityList); + LOG.info("Recovering " + entityList.size() + " entities from FS!"); + return entities; + } finally { + readClose(); + } + } + } + + @Override + public void flush() throws IOException { + entityLogFd.lock(); + try { + entityLogFd.flush(); + entityLogFd.close(); + if (!entityLogFd.isEmpty()) { + LOG.info("Renaming the file"); + entityLogFd.rename(); + } + } finally { + entityLogFd.unlock(); + } + } + + @VisibleForTesting + public void setDeleteLogPathRoot(boolean deleteLogPath) { + deleteLogPathRoot = deleteLogPath; + } + + @Override + public void setTimelineCollectorContext(TimelineCollectorContext context) { + this.context = context; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/InMemoryWALstore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/InMemoryWALstore.java new file mode 100644 index 00000000000..9aff6f2a2ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/InMemoryWALstore.java @@ -0,0 +1,45 @@ +package org.apache.hadoop.yarn.server.timelineservice.recovery; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; + +public class InMemoryWALstore extends AbstractService + implements TimelineWALstore { + + private List entities = new ArrayList<>(); + + public InMemoryWALstore() { + super(InMemoryWALstore.class.getName()); + } + + @Override + public void setTimelineCollectorContext(TimelineCollectorContext context) { + // TODO Auto-generated method stub + + } + + @Override + public void storeEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + this.entities.addAll(entities.getEntities()); + } + + @Override + public TimelineEntities recoverEntities() throws IOException { + TimelineEntities entities = new TimelineEntities(); + entities.addEntities(this.entities); + return entities; + } + + @Override + public void flush() throws IOException { + this.entities.clear(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TimelineWALstore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TimelineWALstore.java new file mode 100644 index 00000000000..559bc52ab1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TimelineWALstore.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.yarn.server.timelineservice.recovery; + +import java.io.IOException; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; + +public interface TimelineWALstore extends Service { + + void setTimelineCollectorContext(TimelineCollectorContext context); + + void storeEntities(TimelineEntities entities, UserGroupInformation callerUgi) + throws IOException; + + TimelineEntities recoverEntities() throws IOException; + + void flush() throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index af9acce265e..65f8029bb21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.timelineservice.recovery.InMemoryWALstore; +import org.apache.hadoop.yarn.server.timelineservice.recovery.TimelineWALstore; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; @@ -60,6 +62,8 @@ public void setup() throws Exception { Configuration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, FileSystemTimelineWriterImpl.class, TimelineWriter.class); + conf.setClass(YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WAL_STORE_CLASS, + InMemoryWALstore.class, TimelineWALstore.class); collectorManager.init(conf); collectorManager.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index cb9ced09309..6e92519e8f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.timelineservice.recovery.InMemoryWALstore; +import org.apache.hadoop.yarn.server.timelineservice.recovery.TimelineWALstore; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; @@ -63,7 +65,9 @@ public TestPerNodeTimelineCollectorsAuxService() { conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, TimelineWriter.class); + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + conf.setClass(YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WAL_STORE_CLASS, + InMemoryWALstore.class, TimelineWALstore.class); } @After diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index ec454284dab..906abf72b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -170,7 +170,7 @@ public void testPutEntityAsync() throws IOException { TimelineEntities entities = generateTestEntities(1, 1); collector.putEntitiesAsync( - entities, UserGroupInformation.createRemoteUser("test-user")); + entities, UserGroupInformation.createRemoteUser("test-user"), false); verify(writer, times(1)).write(any(TimelineCollectorContext.class), any(TimelineEntities.class), any(UserGroupInformation.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TestFileSystemWALStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TestFileSystemWALStore.java new file mode 100644 index 00000000000..78820f1eb54 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/recovery/TestFileSystemWALStore.java @@ -0,0 +1,115 @@ +package org.apache.hadoop.yarn.server.timelineservice.recovery; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFileSystemWALStore { + + protected static Log LOG = LogFactory.getLog(TestFileSystemWALStore.class); + + private static FileContext localFS; + private static File localActiveDir; + private TimelineWALstore writer; + private TimelineWALstore reader; + private ApplicationId appId; + private Configuration conf; + + @Before + public void setup() throws Exception { + localFS = FileContext.getLocalFSFileContext(); + localActiveDir = + new File("target", this.getClass().getSimpleName()).getAbsoluteFile(); + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + localActiveDir.mkdir(); + LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath()); + + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_FS_WAL_STORE_BASE_PATH, + localActiveDir.getAbsolutePath()); + appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + writer = createTimelineStore(); + ((FileSystemWALstore) writer).setDeleteLogPathRoot(false); + reader = createTimelineStore(); + } + + @After + public void tearDown() throws Exception { + writer.stop(); + reader.stop(); + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + } + + @Test + public void testReadWriteEntities() throws Exception { + TimelineEntities recoverEntities = reader.recoverEntities(); + Assert.assertEquals("Stored and recovered entity size are incorrect", 0, + recoverEntities.getEntities().size()); + + UserGroupInformation user1 = UserGroupInformation.createRemoteUser("user1"); + System.out.println("Inside post entities"); + writer.storeEntities(getEntities(0), user1); + writer.flush(); + writer.storeEntities(getEntities(10), user1); + writer.flush(); + // second flush is intentional to verify file length + writer.flush(); + + TimelineEntities entities = getEntities(20); + writer.storeEntities(entities, user1); + writer.stop(); + recoverEntities = reader.recoverEntities(); + Assert.assertEquals("Stored and recovered entity size are incorrect", + entities.getEntities().size(), recoverEntities.getEntities().size()); + Iterator oldIterator = entities.getEntities().iterator(); + Iterator newIterator = + recoverEntities.getEntities().iterator(); + while (oldIterator.hasNext() && newIterator.hasNext()) { + Assert.assertEquals("Both entities are incorrect", oldIterator.next(), + newIterator.next()); + } + } + + private TimelineWALstore createTimelineStore() { + TimelineCollectorContext ctx = Mockito.mock(TimelineCollectorContext.class); + Mockito.when(ctx.getAppId()).thenReturn(appId.toString()); + TimelineWALstore timelineStore = new FileSystemWALstore(); + timelineStore.setTimelineCollectorContext(ctx); + timelineStore.init(conf); + timelineStore.start(); + return timelineStore; + } + + private TimelineEntities getEntities(int startIndex) { + List list = new ArrayList<>(); + for (int i = startIndex; i < startIndex + 10; i++) { + TimelineEntity entity = new TimelineEntity(); + entity.setId("entities-" + i); + entity.setType("TEST"); + entity.addConfig("config-key-" + i, "config-value-" + i); + list.add(entity); + } + System.out.println(list); + TimelineEntities entities = new TimelineEntities(); + entities.addEntities(list); + return entities; + } +} -- 2.13.5 (Apple Git-94)