diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 2fff98d..ad3a68d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -18,16 +18,20 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; +import java.util.EnumSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -45,7 +49,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService implements TimelineWriter { - private String outputRoot; + private static final Log LOG = LogFactory.getLog( + FileSystemTimelineWriterImpl.class); /** Config param for timeline service storage tmp root for FILE YARN-3264. */ public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT @@ -55,58 +60,136 @@ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = "/tmp/timeline_service_data"; + public static final String TIMELINE_FS_WRITER_NUM_RETRIES = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries"; + public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0; + + public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + + "fs-writer.retry-interval-ms"; + public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = + 1000L; + public static final String ENTITIES_DIR = "entities"; + private static final String UNREADABLE_BY_SUPERUSER_XATTRIB = + "security.hdfs.unreadable.by.superuser"; + /** Default extension for output files. */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + private FileSystem fs; + private Path rootPath; + private int fsNumRetries; + private long fsRetryInterval; + private Path entitiesRoot; + + private boolean intermediateEncryptionEnabled; + private Configuration fsConf; FileSystemTimelineWriterImpl() { super((FileSystemTimelineWriterImpl.class.getName())); } @Override + public void serviceInit(Configuration conf) throws Exception { + rootPath = new Path(conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT)); + entitiesRoot = new Path(rootPath, ENTITIES_DIR); + fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES, + DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES); + fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS, + DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS ); + + /** + * TODO: implementing intermediate data encryption + */ + intermediateEncryptionEnabled = + conf.getBoolean(YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, + YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION); + } + + @Override + public void serviceStart() throws Exception { + fsConf = new Configuration(getConfig()); + fsConf.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + fsConf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC); + fsConf.set("dfs.client.retry.policy.spec", retryPolicy); + + String scheme = rootPath.toUri().getScheme(); + if (scheme == null) { + scheme = FileSystem.getDefaultUri(fsConf).getScheme(); + } + if (scheme != null) { + String disableCacheName = + String.format("fs.%s.impl.disable.cache", scheme); + fsConf.setBoolean(disableCacheName, true); + } + fs = FileSystem.get(fsConf); + mkdirsWithRetries(rootPath); + mkdirsWithRetries(entitiesRoot); + } + + @Override + public void serviceStop() throws Exception { + closeWithRetries(); + } + + @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { - write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, - response); + writeInternal(clusterId, userId, flowName, flowVersion, flowRunId, appId, + entity, response); } return response; } - private synchronized void write(String clusterId, String userId, String flowName, - String flowVersion, long flowRun, String appId, TimelineEntity entity, - TimelineWriteResponse response) throws IOException { - PrintWriter out = null; + private synchronized void writeInternal(String clusterId, String userId, + String flowName, String flowVersion, long flowRun, String appId, + TimelineEntity entity, TimelineWriteResponse response) + throws IOException { + Path clusterIdPath = new Path(entitiesRoot, clusterId); + Path userIdPath = new Path(clusterIdPath, userId); + Path flowNamePath = new Path(userIdPath, escape(flowName)); + Path flowVersionPath = new Path(flowNamePath, escape(flowVersion)); + Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun)); + Path appIdPath = new Path(flowRunPath, appId); + Path entityTypePath = new Path(appIdPath, entity.getType()); try { - String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, - escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, - entity.getType()); - String fileName = dir + entity.getId() + - TIMELINE_SERVICE_STORAGE_EXTENSION; - out = - new PrintWriter(new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(fileName, true), "UTF-8"))); - out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); - out.write("\n"); - } catch (IOException ioe) { - TimelineWriteError error = new TimelineWriteError(); - error.setEntityId(entity.getId()); - error.setEntityType(entity.getType()); + mkdirs(rootPath, entitiesRoot, clusterIdPath, userIdPath, + flowNamePath, flowVersionPath, flowRunPath, appIdPath, + entityTypePath); + Path filePath = + new Path(entityTypePath, + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + createFileWithRetries(filePath); + + byte[] record = new StringBuilder() + .append(TimelineUtils.dumpTimelineRecordtoJSON(entity)) + .append("\n").toString().getBytes(); + writeFileWithRetries(filePath, record, false); + } catch (Exception ioe) { + LOG.warn("Interrupted operation:" + ioe.getMessage()); + TimelineWriteError error = createTimelineWriteError(entity); /* * TODO: set an appropriate error code after PoC could possibly be: * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); */ response.addError(error); - } finally { - if (out != null) { - out.close(); - } } } + private TimelineWriteError createTimelineWriteError(TimelineEntity entity) { + TimelineWriteError error = new TimelineWriteError(); + error.setEntityId(entity.getId()); + error.setEntityType(entity.getType()); + return error; + } + @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { @@ -115,36 +198,137 @@ public TimelineWriteResponse aggregate(TimelineEntity data, } public String getOutputRoot() { - return outputRoot; + return rootPath.toString(); } - @Override - public void serviceInit(Configuration conf) throws Exception { - outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, - DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + + private void mkdirs(Path... paths) throws IOException, InterruptedException { + for (Path path: paths) { + if (!existsWithRetries(path)) { + mkdirsWithRetries(path); + } + } } - @Override - public void serviceStart() throws Exception { - mkdirs(outputRoot, ENTITIES_DIR); + // specifically escape the separator character + private static String escape(String str) { + return str.replace(File.separatorChar, '_'); + } + + + // Code from FSRMStateStore. + private void mkdirsWithRetries(final Path dirPath) + throws IOException, InterruptedException { + new FSAction() { + @Override + public Void run() throws IOException { + fs.mkdirs(dirPath); + return null; + } + }.runWithRetries(); + } + + private void writeFileWithRetries(final Path outputPath, final byte[] data, + final boolean makeUnreadableByAdmin) + throws Exception { + new FSAction() { + @Override + public Void run() throws IOException { + writeFile(outputPath, data, makeUnreadableByAdmin); + return null; + } + }.runWithRetries(); + } + + + private boolean createFileWithRetries(final Path newFile) + throws IOException, InterruptedException { + return new FSAction() { + @Override + public Boolean run() throws IOException { + return createFile(newFile); + } + }.runWithRetries(); + } + + + private boolean existsWithRetries(final Path path) + throws IOException, InterruptedException { + return new FSAction() { + @Override + public Boolean run() throws IOException { + return fs.exists(path); + } + }.runWithRetries(); } - private static String mkdirs(String... dirStrs) throws IOException { - StringBuilder path = new StringBuilder(); - for (String dirStr : dirStrs) { - path.append(dirStr).append('/'); - File dir = new File(path.toString()); - if (!dir.exists()) { - if (!dir.mkdirs()) { - throw new IOException("Could not create directories for " + dir); + private void closeWithRetries() throws Exception { + new FSAction() { + @Override + public Void run() throws IOException{ + fs.close(); + return null; + } + }.runWithRetries(); + } + + private abstract class FSAction { + abstract T run() throws IOException; + + T runWithRetries() throws IOException, InterruptedException { + int retry = 0; + while (true) { + try { + return run(); + } catch (IOException e) { + LOG.info("Exception while executing a FS operation.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out FS retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on FS. Retry no. " + retry); + Thread.sleep(fsRetryInterval); } } } - return path.toString(); } - // specifically escape the separator character - private static String escape(String str) { - return str.replace(File.separatorChar, '_'); + private boolean createFile(Path newFile) throws IOException { + return fs.createNewFile(newFile); + } + + /** + * In order to make this writeInternal atomic as a part of writeInternal + * we will first writeInternal data to .tmp file and then rename it. + * Here we are assuming that rename is atomic for underlying file system. + */ + protected void writeFile(Path outputPath, byte[] data, boolean + makeUnradableByAdmin) throws IOException { + Path tempPath = + new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); + FSDataOutputStream fsOut = null; + // This file will be overwritten when app/attempt finishes for saving the + // final status. + try { + fsOut = fs.create(tempPath, true); + if (makeUnradableByAdmin) { + setUnreadableBySuperuserXattrib(tempPath); + } + fsOut.write(data); + fsOut.close(); + fsOut = null; + fs.rename(tempPath, outputPath); + } finally { + IOUtils.cleanup(LOG, fsOut); + } + } + + private void setUnreadableBySuperuserXattrib(Path p) throws IOException { + if (fs.getScheme().toLowerCase().contains("hdfs") + && intermediateEncryptionEnabled + && !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) { + fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null, + EnumSet.of(XAttrSetFlag.CREATE)); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 50a9f60..31a016f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -18,16 +18,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; import java.io.File; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.List; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -35,9 +38,8 @@ import org.junit.Test; public class TestFileSystemTimelineWriterImpl { - /** - * Unit test for PoC YARN 3264 + * Unit test for PoC YARN-3264 * @throws Exception */ @Test @@ -55,29 +57,38 @@ public void testWriteEntityToFile() throws Exception { FileSystemTimelineWriterImpl fsi = null; try { fsi = new FileSystemTimelineWriterImpl(); - fsi.init(new YarnConfiguration()); + Configuration conf = new YarnConfiguration(); + fsi.init(conf); fsi.start(); fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, "app_id", te); String fileName = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + - type + "/" + id + + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/" + + "app_id/" + type + "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path = Paths.get(fileName); - File f = new File(fileName); - assertTrue(f.exists() && !f.isDirectory()); - List data = Files.readAllLines(path, StandardCharsets.UTF_8); + Path path = new Path(fileName); + FileSystem fs = FileSystem.get(conf); + assertTrue("Specified path(" + fileName + ") should exist: ", + fs.exists(path)); + assertTrue("Specified path should be a file", !fs.isDirectory(path)); + BufferedReader br = + new BufferedReader(new InputStreamReader(fs.open(path))); + + List data = new ArrayList(); + + String line = br.readLine(); + data.add(line); + while(line != null) { + line = br.readLine(); + data.add(line); + } + // ensure there's only one entity + 1 new line assertTrue(data.size() == 2); String d = data.get(0); // confirm the contents same as what was written assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); - - // delete the directory - File outputDir = new File(fsi.getOutputRoot()); - FileUtils.deleteDirectory(outputDir); - assertTrue(!(f.exists())); } finally { if (fsi != null) { fsi.stop(); @@ -85,4 +96,5 @@ public void testWriteEntityToFile() throws Exception { } } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties new file mode 100644 index 0000000..81a3f6a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n