diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ce38d2762b2..e360d0ee789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1411,6 +1411,13 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; + /** Whether to run NM log aggregation as 'app owner' or fall back to + * 'yarn' user. Defaults to 'true' + */ + public static final String LOG_AGGREGATION_RUN_AS_APP_OWNER + = NM_PREFIX + "log-aggregation-run-as-appOwner"; + public static final boolean DEFAULT_LOG_AGGREGATION_RUN_AS_APP_OWNER = true; + /** * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFormat.java index e10305acf4f..8a7dd8ae27b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ConfigFormat.java @@ -35,6 +35,7 @@ TEMPLATE("template"), YAML("yaml"), ; + ConfigFormat(String suffix) { this.suffix = suffix; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..5e055e7a4c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -470,18 +470,20 @@ public void initialize(final Configuration conf, final Path remoteAppLogFile, UserGroupInformation userUgi) throws IOException { try { - this.fsDataOStream = - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public FSDataOutputStream run() throws Exception { - fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); - fc.setUMask(APP_LOG_FILE_UMASK); - return fc.create( - remoteAppLogFile, - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), - new Options.CreateOpts[] {}); - } - }); + + boolean runLogAggAsAppOwner = conf.getBoolean(YarnConfiguration + .LOG_AGGREGATION_RUN_AS_APP_OWNER, true); + + if (runLogAggAsAppOwner) { + fsDataOStream = userUgi.doAs( + new PrivilegedExceptionAction() { + @Override public FSDataOutputStream run() throws Exception { + return createOutputStream(conf, remoteAppLogFile); + } + }); + } else { + fsDataOStream = createOutputStream(conf, remoteAppLogFile); + } } catch (InterruptedException e) { throw new IOException(e); } @@ -496,6 +498,16 @@ public FSDataOutputStream run() throws Exception { writeVersion(); } + private FSDataOutputStream createOutputStream(final Configuration conf, + final Path remoteAppLogFile) throws IOException { + fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc.create( + remoteAppLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + } + @VisibleForTesting public TFile.Writer getWriter() { return this.writer; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index fe652881af0..790ffff93f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.webapp.View.ViewContext; @@ -62,6 +61,9 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import static org.apache.hadoop.yarn.conf.YarnConfiguration + .DEFAULT_LOG_AGGREGATION_RUN_AS_APP_OWNER; + /** * Base class to implement Log Aggregation File Controller. */ @@ -108,6 +110,7 @@ protected String remoteRootLogDirSuffix; protected int retentionSize; protected String fileControllerName; + protected boolean runLogAggAsAppOwner; public LogAggregationFileController() {} @@ -130,6 +133,9 @@ public void initialize(Configuration conf, String controllerName) { this.retentionSize = configuredRetentionSize; } this.fileControllerName = controllerName; + this.runLogAggAsAppOwner = conf.getBoolean(YarnConfiguration + .LOG_AGGREGATION_RUN_AS_APP_OWNER, DEFAULT_LOG_AGGREGATION_RUN_AS_APP_OWNER); + initInternal(conf); } @@ -319,54 +325,17 @@ public void verifyAndCreateRemoteLogDir() { */ public void createAppDir(final String user, final ApplicationId appId, UserGroupInformation userUgi) { - final Path remoteRootLogDir = getRemoteRootLogDir(); - final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix(); try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - try { - // TODO: Reuse FS for user? - FileSystem remoteFS = getFileSystem(conf); - - // Only creating directories if they are missing to avoid - // unnecessary load on the filesystem from all of the nodes - Path appDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, user, remoteRootLogDirSuffix); - - appDir = appDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); - } - - } catch (IOException e) { - LOG.error("Failed to setup application log directory for " - + appId, e); - throw e; + if (runLogAggAsAppOwner) { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { + createAppDir(user, appId); + return null; } - return null; - } - }); + }); + } else{ + createAppDir(user, appId); + } } catch (Exception e) { if (e instanceof RemoteException) { throw new YarnRuntimeException(((RemoteException) e) @@ -376,6 +345,52 @@ public Object run() throws Exception { } } + private void createAppDir(final String user, final ApplicationId appId) + throws IOException { + try { + final Path remoteRootLogDir = getRemoteRootLogDir(); + final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix(); + + // TODO: Reuse FS for user? + FileSystem remoteFS = getFileSystem(conf); + + // Only creating directories if they are missing to avoid + // unnecessary load on the filesystem from all of the nodes + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + + appDir = appDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + } + + } catch (IOException e) { + LOG.error("Failed to setup application log directory for " + + appId, e); + throw e; + } + } + @VisibleForTesting protected FileSystem getFileSystem(Configuration conf) throws IOException { return getRemoteRootLogDir().getFileSystem(conf); @@ -467,16 +482,20 @@ public int compare(FileStatus s1, FileStatus s2) { }); for (int i = 0; i <= statusList.size() - this.retentionSize; i++) { final FileStatus remove = statusList.get(i); - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - remoteFS.delete(remove.getPath(), false); - return null; - } - }); - } catch (Exception e) { - LOG.error("Failed to delete " + remove.getPath(), e); + + if ( runLogAggAsAppOwner ) { + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { + remoteFS.delete(remove.getPath(), false); + return null; + } + }); + } catch (Exception e) { + LOG.error("Failed to delete " + remove.getPath(), e); + } + } else { + remoteFS.delete(remove.getPath(), false); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 78b0c134976..a2d45348cfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -140,10 +140,11 @@ public void initInternal(Configuration conf) { // LogAggregationIndexedFileController for non-append mode. boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true); if (!append) { - throw new YarnRuntimeException("The configuration:" - + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only" - + " use LogAggregationIndexedFileController when the FileSystem " - + "support append operations."); + throw new YarnRuntimeException( + "The configuration:" + LOG_AGGREGATION_FS_SUPPORT_APPEND + + " is set as False. We can only" + + " use LogAggregationIndexedFileController when the FileSystem " + + "support append operations."); } String remoteDirStr = String.format( YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, @@ -158,18 +159,16 @@ public void initInternal(Configuration conf) { YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, this.fileControllerName); this.remoteRootLogDirSuffix = conf.get(suffix); - if (this.remoteRootLogDirSuffix == null - || this.remoteRootLogDirSuffix.isEmpty()) { + if (this.remoteRootLogDirSuffix == null || this.remoteRootLogDirSuffix + .isEmpty()) { this.remoteRootLogDirSuffix = conf.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX) - + "-ifile"; + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX) + "-ifile"; } String compressName = conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); - this.compressAlgo = Compression.getCompressionAlgorithmByName( - compressName); + this.compressAlgo = Compression.getCompressionAlgorithmByName(compressName); this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3); this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L); this.logRollOverMaxFileSize = getRollOverLogMaxSize(conf); @@ -181,10 +180,8 @@ public void initializeWriter( final LogAggregationFileControllerContext context) throws IOException { final UserGroupInformation userUgi = context.getUserUgi(); - final Map appAcls = context.getAppAcls(); - final String nodeId = context.getNodeId().toString(); - final ApplicationId appId = context.getAppId(); final Path remoteLogFile = context.getRemoteNodeLogFileForApp(); + this.ugi = userUgi; logAggregationSuccessfullyInThisCyCle = false; logsMetaInThisCycle = new IndexedPerAggregationLogMeta(); @@ -192,62 +189,75 @@ public void initializeWriter( logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle); logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName()); try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - fc = FileContext.getFileContext( - remoteRootLogDir.toUri(), conf); - fc.setUMask(APP_LOG_FILE_UMASK); - if (indexedLogsMeta == null) { - indexedLogsMeta = new IndexedLogsMeta(); - indexedLogsMeta.setVersion(VERSION); - indexedLogsMeta.setUser(userUgi.getShortUserName()); - indexedLogsMeta.setAcls(appAcls); - indexedLogsMeta.setNodeId(nodeId); - String compressName = conf.get( - YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, - YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); - indexedLogsMeta.setCompressName(compressName); + if (runLogAggAsAppOwner) { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { + initializeWriterForApp(context); + return null; } - Path aggregatedLogFile = null; - if (context.isLogAggregationInRolling()) { - aggregatedLogFile = initializeWriterInRolling( - remoteLogFile, appId, nodeId); - } else { - aggregatedLogFile = remoteLogFile; - fsDataOStream = fc.create(remoteLogFile, - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), - new Options.CreateOpts[] {}); - if (uuid == null) { - uuid = createUUID(appId); - } - fsDataOStream.write(uuid); - fsDataOStream.flush(); - } - - long aggregatedLogFileLength = fc.getFileStatus( - aggregatedLogFile).getLen(); - // append a simple character("\n") to move the writer cursor, so - // we could get the correct position when we call - // fsOutputStream.getStartPos() - final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); - fsDataOStream.write(dummyBytes); - fsDataOStream.flush(); - - if (fsDataOStream.getPos() >= (aggregatedLogFileLength - + dummyBytes.length)) { - currentOffSet = 0; - } else { - currentOffSet = aggregatedLogFileLength; - } - return null; - } - }); + }); + } else{ + initializeWriterForApp(context); + } } catch (Exception e) { throw new IOException(e); } } + private void initializeWriterForApp(final LogAggregationFileControllerContext + context) throws Exception { + final Path remoteLogFile = context.getRemoteNodeLogFileForApp(); + final Map appAcls = context.getAppAcls(); + final String nodeId = context.getNodeId().toString(); + final ApplicationId appId = context.getAppId(); + fc = FileContext.getFileContext( + remoteRootLogDir.toUri(), conf); + fc.setUMask(APP_LOG_FILE_UMASK); + if (indexedLogsMeta == null) { + indexedLogsMeta = new IndexedLogsMeta(); + indexedLogsMeta.setVersion(VERSION); + indexedLogsMeta.setUser(ugi.getShortUserName()); + indexedLogsMeta.setAcls(appAcls); + indexedLogsMeta.setNodeId(nodeId); + String compressName = conf.get( + YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); + indexedLogsMeta.setCompressName(compressName); + } + Path aggregatedLogFile = null; + if (context.isLogAggregationInRolling()) { + aggregatedLogFile = initializeWriterInRolling( + remoteLogFile, appId, nodeId); + } else { + aggregatedLogFile = remoteLogFile; + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + if (uuid == null) { + uuid = createUUID(appId); + } + fsDataOStream.write(uuid); + fsDataOStream.flush(); + } + + long aggregatedLogFileLength = fc.getFileStatus( + aggregatedLogFile).getLen(); + // append a simple character("\n") to move the writer cursor, so + // we could get the correct position when we call + // fsOutputStream.getStartPos() + final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); + fsDataOStream.write(dummyBytes); + fsDataOStream.flush(); + + if (fsDataOStream.getPos() >= (aggregatedLogFileLength + + dummyBytes.length)) { + currentOffSet = 0; + } else { + currentOffSet = aggregatedLogFileLength; + } + + } + private Path initializeWriterInRolling(final Path remoteLogFile, final ApplicationId appId, final String nodeId) throws Exception { Path aggregatedLogFile = null; @@ -463,18 +473,28 @@ public Void run() throws Exception { }.runWithRetries(); } - private Object deleteFileWithPrivilege(final FileContext fileContext, + private void deleteFileWithPrivilege(final FileContext fileContext, final UserGroupInformation userUgi, final Path fileToDelete) throws Exception { - return userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - if (fileContext.util().exists(fileToDelete)) { - fileContext.delete(fileToDelete, false); + + if ( runLogAggAsAppOwner ) { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { + deleteFile(fileContext, userUgi, fileToDelete); + return null; } - return null; - } - }); + }); + } else { + deleteFile(fileContext, userUgi, fileToDelete); + } + } + + private void deleteFile(final FileContext fileContext, + final UserGroupInformation userUgi, final Path fileToDelete) + throws IOException { + if (fileContext.util().exists(fileToDelete)) { + fileContext.delete(fileToDelete, false); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index b3103d29add..32cc90cda38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -129,27 +130,22 @@ public void postWrite(final LogAggregationFileControllerContext record) // close the writer before the file is renamed or deleted closeWriter(); - final Path renamedPath = record.getRollingMonitorInterval() <= 0 - ? record.getRemoteNodeLogFileForApp() : new Path( - record.getRemoteNodeLogFileForApp().getParent(), - record.getRemoteNodeLogFileForApp().getName() + "_" - + record.getLogUploadTimeStamp()); - final boolean rename = record.isUploadedLogsInThisCycle(); + Path renamedPath = getRenamedPath(record.getRemoteNodeLogFileForApp(), + record.getLogUploadTimeStamp(), record.getRollingMonitorInterval()); try { - record.getUserUgi().doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - FileSystem remoteFS = record.getRemoteNodeLogFileForApp() - .getFileSystem(conf); - if (rename) { - remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(), - renamedPath); - } else { - remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false); + UserGroupInformation userUgi = record.getUserUgi(); + if (runLogAggAsAppOwner) { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { + renameOrDeleteTempLog(record.getRemoteNodeTmpLogFileForApp(), + record.isUploadedLogsInThisCycle(), renamedPath); + return null; } - return null; - } - }); + }); + } else{ + renameOrDeleteTempLog(record.getRemoteNodeTmpLogFileForApp(), + record.isUploadedLogsInThisCycle(), renamedPath); + } } catch (Exception e) { LOG.error( "Failed to move temporary log file to final location: [" @@ -162,6 +158,27 @@ public Object run() throws Exception { } } + private boolean renameOrDeleteTempLog(Path filePath, boolean isLogUploaded, + Path renamedPath) throws IOException { + final boolean rename = isLogUploaded; + + FileSystem remoteFS = filePath.getFileSystem(conf); + if (rename) { + return remoteFS.rename(filePath, renamedPath); + } else{ + return remoteFS.delete(filePath, false); + } + } + + private Path getRenamedPath(Path filePath, + long logUploadedTimeStamp, long rollingMonitorInterval) { + final Path renamedPath = + rollingMonitorInterval <= 0 ? filePath : new Path( + filePath.getParent(), + filePath.getName() + "_" + logUploadedTimeStamp); + return renamedPath; + } + @Override public boolean readAggregatedLogs(ContainerLogsRequest logRequest, OutputStream os) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1360e73f7fd..3116486ecaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1907,6 +1907,13 @@ none + + Whether to run NM log aggregation as 'app owner' or fall + back to 'yarn' user. Defaults to 'true' + yarn.nodemanager.log-aggregation-run-as-appOwner + true + + The kerberos principal for the node manager. yarn.nodemanager.principal