diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java index e7f4c83..050c15b 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java @@ -28,7 +28,7 @@ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they * would not interfere with each other. */ -class BoundedRangeFileInputStream extends InputStream { +public class BoundedRangeFileInputStream extends InputStream { private FSDataInputStream in; private long pos; diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java index f82f4df..fa85ed7 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java @@ -43,7 +43,7 @@ /** * Compression related stuff. */ -final class Compression { +public final class Compression { static final Logger LOG = LoggerFactory.getLogger(Compression.class); /** @@ -75,7 +75,7 @@ public void flush() throws IOException { /** * Compression algorithms. */ - enum Algorithm { + public enum Algorithm { LZO(TFile.COMPRESSION_LZO) { private transient boolean checked = false; private static final String defaultClazz = @@ -348,7 +348,7 @@ public String getName() { } } - static Algorithm getCompressionAlgorithmByName(String compressName) { + public static Algorithm getCompressionAlgorithmByName(String compressName) { Algorithm[] algos = Algorithm.class.getEnumConstants(); for (Algorithm a : algos) { diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java index a26a02d..0a194a3 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java @@ -25,7 +25,7 @@ * A simplified BufferedOutputStream with borrowed buffer, and allow users to * see how much data have been buffered. */ -class SimpleBufferedOutputStream extends FilterOutputStream { +public class SimpleBufferedOutputStream extends FilterOutputStream { protected byte buf[]; // the borrowed buffer protected int count = 0; // bytes used in buffer. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index e8a28de..f8bcded 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -30,6 +30,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; @Private public class LogAggregationUtils { @@ -200,6 +203,30 @@ public static String getNodeString(String nodeId) { * @param conf the configuration * @param appId the applicationId * @param appOwner the application owner + * @param remoteRootLogDir the remote root log directory + * @param suffix the log directory suffix + * @return the list of available log files + * @throws IOException if there is no log file available + */ + public static List getRemoteNodeFileList( + Configuration conf, ApplicationId appId, String appOwner, + org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) + throws IOException { + Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, + remoteRootLogDir, suffix); + List nodeFiles = new ArrayList<>(); + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + return nodeFiles; + } + + /** + * Get all available log files under remote app log directory. + * @param conf the configuration + * @param appId the applicationId + * @param appOwner the application owner * @return the iterator of available log files * @throws IOException if there is no log file available */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 23a0162..77de891 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -91,6 +91,12 @@ protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission .createImmutable((short) 0770); + /** + * Umask for the log file. + */ + protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + // This is temporary solution. The configuration will be deleted once // we find a more scalable method to only write a single log file per LRS. private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationIndexedFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationIndexedFileController.java new file mode 100644 index 0000000..7ed85fd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationIndexedFileController.java @@ -0,0 +1,1267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream; +import org.apache.hadoop.io.file.tfile.Compression; +import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream; +import org.apache.hadoop.io.file.tfile.Compression.Algorithm; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; + +@Private +@Unstable +public class LogAggregationIndexedFileController extends LogAggregationFileController { + + private static final Log LOG = LogFactory.getLog( + LogAggregationIndexedFileController.class); + private static final String FS_OUTPUT_BUF_SIZE_ATTR = + "indexedFile.fs.output.buffer.size"; + private static final String FS_INPUT_BUF_SIZE_ATTR = + "indexedFile.fs.input.buffer.size"; + private static final String FS_NUM_RETRIES_ATTR = + "indexedFile.fs.op.num-retries"; + private static final String FS_RETRY_INTERVAL_MS_ATTR = + "indexedFile.fs.retry-interval-ms"; + private static final int UUID_LENGTH = 36; + + @VisibleForTesting + public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; + + private int fsNumRetries = 3; + private long fsRetryInterval = 1000L; + private static final int VERSION = 1; + private IndexedLogsMeta indexedLogsMeta = null; + private LogMeta logsMetaInThisCycle; + private long logAggregationTimeInThisCycle; + private FSDataOutputStream fsDataOStream; + private Algorithm compressAlgo; + private CachedIndexedLogsMeta cachedIndexedLogsMeta = null; + private boolean logAggregationSuccessfullyInThisCyCle = false; + private long currentOffSet = 0; + private Path remoteLogCheckSumFile; + private FileContext fc; + private UserGroupInformation ugi; + private String uuid = null; + + public LogAggregationIndexedFileController() {} + + @Override + public void initInternal(Configuration conf) { + String remoteDirStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + this.fileControllerName); + String remoteDir = conf.get(remoteDirStr); + if (remoteDir == null || remoteDir.isEmpty()) { + throw new RuntimeException("Should specify value for the configuration:" + + remoteDirStr); + } + this.remoteRootLogDir = new Path(remoteDir); + String suffix = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + this.fileControllerName); + this.remoteRootLogDirSuffix = conf.get(suffix); + if (this.remoteRootLogDirSuffix == null + || this.remoteRootLogDirSuffix.isEmpty()) { + throw new RuntimeException("Should specify value for the configuration:" + + this.remoteRootLogDirSuffix); + } + String compressName = conf.get( + YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); + this.compressAlgo = Compression.getCompressionAlgorithmByName( + compressName); + this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3); + this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L); + } + + @Override + public void initializeWriter(LogAggregationFileControllerContext context) + throws IOException { + UserGroupInformation userUgi = context.getUserUgi(); + Map appAcls = context.getAppAcls(); + String nodeId = context.getNodeId().toString(); + Path remoteLogFile = context.getRemoteNodeLogFileForApp(); + this.ugi = userUgi; + logAggregationSuccessfullyInThisCyCle = false; + if (indexedLogsMeta == null) { + indexedLogsMeta = new IndexedLogsMeta(); + indexedLogsMeta.setVersion(VERSION); + indexedLogsMeta.setUser(userUgi.getShortUserName()); + indexedLogsMeta.setAcls(appAcls); + indexedLogsMeta.setNodeId(nodeId); + String compressName = conf.get( + YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); + indexedLogsMeta.setCompressName(compressName); + } + logsMetaInThisCycle = new LogMeta(); + logAggregationTimeInThisCycle = System.currentTimeMillis(); + logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle); + logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName()); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + fc = FileContext.getFileContext( + remoteRootLogDir.toUri(), conf); + fc.setUMask(APP_LOG_FILE_UMASK); + boolean fileExist = fc.util().exists(remoteLogFile); + if (fileExist) { + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.APPEND), + new Options.CreateOpts[] {}); + if (uuid == null) { + FSDataInputStream fsDataInputStream = null; + try { + fsDataInputStream = fc.open(remoteLogFile); + byte[] b = new byte[UUID_LENGTH]; + fsDataInputStream.read(b); + uuid = new String(b); + } finally { + IOUtils.closeQuietly(fsDataInputStream); + } + } + } else { + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.CREATE), + new Options.CreateOpts[] {}); + if (uuid == null) { + uuid = UUID.randomUUID().toString(); + } + byte[] b = uuid.getBytes(); + fsDataOStream.write(b); + fsDataOStream.flush(); + } + indexedLogsMeta.setUuid(uuid); + final long currentAggregatedLogFileLength = fc + .getFileStatus(remoteLogFile).getLen(); + // only check the check-sum file when we are in append mode + if (context.isLogAggregationInRolling()) { + // check whether the checksum file exists to figure out + // whether the previous log aggregation process is successful + // and the aggregated log file is corrupted or not. + remoteLogCheckSumFile = new Path(remoteLogFile.getParent(), + (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX)); + boolean exist = fc.util().exists(remoteLogCheckSumFile); + if (!exist) { + FSDataOutputStream fsDataOutputStream = null; + try { + fsDataOutputStream = fc.create(remoteLogCheckSumFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + fsDataOutputStream.writeLong(currentAggregatedLogFileLength); + } finally { + IOUtils.closeQuietly(fsDataOutputStream); + } + } else { + FSDataInputStream fsDataInputStream = null; + try { + fsDataInputStream = fc.open(remoteLogCheckSumFile); + long endIndex = fsDataInputStream.readLong(); + IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta( + remoteLogFile, endIndex); + if (recoveredLogsMeta == null) { + indexedLogsMeta.getLogMetas().clear(); + } else { + indexedLogsMeta = recoveredLogsMeta; + } + } finally { + IOUtils.closeQuietly(fsDataInputStream); + } + } + } + // append a simple character("\n") to move the writer cursor, so + // we could get the correct position when we call + // fsOutputStream.getStartPos() + final byte[] dummyBytes = "\n".getBytes(); + fsDataOStream.write(dummyBytes); + fsDataOStream.flush(); + + if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength + + dummyBytes.length)) { + currentOffSet = 0; + } else { + currentOffSet = currentAggregatedLogFileLength; + } + return null; + } + }); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void closeWriter() { + IOUtils.closeQuietly(this.fsDataOStream); + } + + @Override + public void write(LogKey logKey, LogValue logValue) throws IOException { + String containerId = logKey.toString(); + Set pendingUploadFiles = logValue + .getPendingLogFilesToUploadForThisContainer(); + List metas = new ArrayList<>(); + for (File logFile : pendingUploadFiles) { + FileInputStream in = null; + try { + in = SecureIOUtils.openForRead(logFile, logValue.getUser(), null); + } catch (IOException e) { + logErrorMessage(logFile, e); + IOUtils.closeQuietly(in); + continue; + } + final long fileLength = logFile.length(); + IndexedFileOutputStreamState outputStreamState = null; + try { + outputStreamState = new IndexedFileOutputStreamState( + this.compressAlgo, this.fsDataOStream, conf, this.currentOffSet); + byte[] buf = new byte[65535]; + int len = 0; + long bytesLeft = fileLength; + while ((len = in.read(buf)) != -1) { + //If buffer contents within fileLength, write + if (len < bytesLeft) { + outputStreamState.getOutputStream().write(buf, 0, len); + bytesLeft-=len; + } + //else only write contents within fileLength, then exit early + else { + outputStreamState.getOutputStream().write(buf, 0, + (int)bytesLeft); + break; + } + } + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated logs truncated by approximately "+ + (newLength-fileLength) +" bytes."); + } + logAggregationSuccessfullyInThisCyCle = true; + } catch (IOException e) { + String message = logErrorMessage(logFile, e); + if (outputStreamState != null && + outputStreamState.getOutputStream() != null) { + outputStreamState.getOutputStream().write( + message.getBytes(Charset.forName("UTF-8"))); + } + } finally { + IOUtils.closeQuietly(in); + } + + FileLogMeta meta = new FileLogMeta(); + meta.setContainerId(containerId); + meta.setFileName(logFile.getName()); + if (outputStreamState != null) { + outputStreamState.finish(); + meta.setFileCompressedSize(outputStreamState.getCompressedSize()); + meta.setStartIndex(outputStreamState.getStartPos()); + meta.setFileSize(fileLength); + } + meta.setLastModificatedTime(logFile.lastModified()); + metas.add(meta); + } + logsMetaInThisCycle.addContainerLogMeta(containerId, metas); + } + + @Override + public void postWrite(LogAggregationFileControllerContext record) throws Exception { + // always aggregate the previous logsMeta, and append them together + // at the end of the file + indexedLogsMeta.addLogMeta(logsMetaInThisCycle); + byte[] b = SerializationUtils.serialize(indexedLogsMeta); + this.fsDataOStream.write(b); + int length = b.length; + this.fsDataOStream.writeInt(length); + if (logAggregationSuccessfullyInThisCyCle) { + deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile); + } + } + + private void deleteFileWithRetries(final FileContext fc, + final UserGroupInformation userUgi, + final Path deletePath) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + deleteFileWithPrivilege(fc, userUgi, deletePath); + return null; + } + }.runWithRetries(); + } + + private Object deleteFileWithPrivilege(final FileContext fc, + final UserGroupInformation userUgi, final Path fileToDelete) + throws Exception { + return userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + if (fc.util().exists(fileToDelete)) { + fc.delete(fileToDelete, false); + } + return null; + } + }); + } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + boolean findLogs = false; + boolean createPrintStream = (os == null); + ApplicationId appId = logRequest.getAppId(); + String nodeId = logRequest.getNodeId(); + String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null + : LogAggregationUtils.getNodeString(nodeId); + List logTypes = new ArrayList<>(); + if (logRequest.getLogTypes() != null && !logRequest + .getLogTypes().isEmpty()) { + logTypes.addAll(logRequest.getLogTypes()); + } + String containerIdStr = logRequest.getContainerId(); + boolean getAllContainers = (containerIdStr == null + || containerIdStr.isEmpty()); + long size = logRequest.getBytes(); + List nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + if (nodeFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = filterFiles( + nodeFiles, CHECK_SUM_FILE_SUFFIX); + List fileToRead = getNodeLogFileToRead( + nodeFiles, nodeIdStr, appId); + byte[] buf = new byte[65535]; + for (FileStatus thisNodeFile : fileToRead) { + String nodeName = thisNodeFile.getPath().getName(); + FileStatus checkSum = getFilteredFiles(checkSumFiles, + thisNodeFile.getPath().getName(), CHECK_SUM_FILE_SUFFIX); + long endIndex = -1; + if (checkSum != null) { + endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + } + IndexedLogsMeta indexedLogsMeta = null; + try { + indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(), + endIndex); + } catch (Exception ex) { + // DO NOTHING + LOG.warn("Can not load log meta from the log file:" + + thisNodeFile.getPath()); + continue; + } + if (indexedLogsMeta == null) { + continue; + } + String compressAlgo = indexedLogsMeta.getCompressName(); + List candidates = new ArrayList<>(); + for (LogMeta logMeta : indexedLogsMeta.getLogMetas()) { + for (Entry> meta : logMeta.getLogMetas() + .entrySet()) { + for (FileLogMeta log : meta.getValue()) { + if (!getAllContainers && !log.getContainerId() + .equals(containerIdStr)) { + continue; + } + if (logTypes != null && !logTypes.isEmpty() && + !logTypes.contains(log.getFileName())) { + continue; + } + candidates.add(log); + } + } + } + if (candidates.isEmpty()) { + continue; + } + + Collections.sort(candidates, new Comparator() { + @Override + public int compare(FileLogMeta o1, FileLogMeta o2) { + return o1.getContainerId().compareTo(o2.getContainerId()); + } + }); + + Algorithm compressName = Compression.getCompressionAlgorithmByName( + compressAlgo); + Decompressor decompressor = compressName.getDecompressor(); + FileContext fileContext = FileContext.getFileContext( + thisNodeFile.getPath().toUri(), conf); + FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); + String currentContainer = ""; + for (FileLogMeta candidate : candidates) { + if (!candidate.getContainerId().equals(currentContainer)) { + if (createPrintStream) { + closePrintStream(os); + os = createPrintStream( + logRequest.getOutputLocalDir(), + thisNodeFile.getPath().getName(), + candidate.getContainerId()); + currentContainer = candidate.getContainerId(); + } + } + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, + candidate.getStartIndex(), + candidate.getFileCompressedSize()), + decompressor, getFSInputBufferSize(conf)); + LogToolUtils.outputContainerLog(candidate.getContainerId(), + nodeName, candidate.getFileName(), candidate.getFileSize(), size, + Times.format(candidate.getLastModificatedTime()), + in, os, buf, ContainerLogAggregationType.AGGREGATED); + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:" + candidate.getFileName(); + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } catch (IOException e) { + System.err.println(e.getMessage()); + compressName.returnDecompressor(decompressor); + continue; + } finally { + os.flush(); + IOUtils.closeQuietly(in); + } + } + } + return findLogs; + } + + // TODO: fix me if the remote file system does not support append operation. + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + List listOfLogsMeta = new ArrayList<>(); + List containersLogMeta = new ArrayList<>(); + String containerIdStr = logRequest.getContainerId(); + String nodeId = logRequest.getNodeId(); + ApplicationId appId = logRequest.getAppId(); + String appOwner = logRequest.getAppOwner(); + boolean getAllContainers = (containerIdStr == null || + containerIdStr.isEmpty()); + String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null + : LogAggregationUtils.getNodeString(nodeId); + List nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, + this.remoteRootLogDirSuffix); + if (nodeFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = filterFiles( + nodeFiles, CHECK_SUM_FILE_SUFFIX); + List fileToRead = getNodeLogFileToRead( + nodeFiles, nodeIdStr, appId); + for(FileStatus thisNodeFile : fileToRead) { + try { + FileStatus checkSum = getFilteredFiles(checkSumFiles, + thisNodeFile.getPath().getName(), CHECK_SUM_FILE_SUFFIX); + long endIndex = -1; + if (checkSum != null) { + endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + } + IndexedLogsMeta current = loadIndexedLogsMeta( + thisNodeFile.getPath(), endIndex); + if (current != null) { + listOfLogsMeta.add(current); + } + } catch (IOException ex) { + // DO NOTHING + LOG.warn("Can not get log meta from the log file:" + + thisNodeFile.getPath()); + } + } + for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) { + String curNodeId = indexedLogMeta.getNodeId(); + for (LogMeta logMeta : indexedLogMeta.getLogMetas()) { + if (getAllContainers) { + for (Entry> log : logMeta + .getLogMetas().entrySet()) { + ContainerLogMeta meta = new ContainerLogMeta(log.getKey(), + curNodeId); + for (FileLogMeta aMeta : log.getValue()) { + meta.addLogMeta(aMeta.getFileName(), Long.toString( + aMeta.getFileSize()), + Times.format(aMeta.getLastModificatedTime())); + } + containersLogMeta.add(meta); + } + } else if (logMeta.getContainerLogMeta(containerIdStr) != null) { + ContainerLogMeta meta = new ContainerLogMeta(containerIdStr, + curNodeId); + for (FileLogMeta log : logMeta.getContainerLogMeta(containerIdStr)) { + meta.addLogMeta(log.getFileName(), Long.toString( + log.getFileSize()), + Times.format(log.getLastModificatedTime())); + } + containersLogMeta.add(meta); + } + } + } + Collections.sort(containersLogMeta, new Comparator() { + @Override + public int compare(ContainerLogMeta o1, ContainerLogMeta o2) { + return o1.getContainerId().compareTo(o2.getContainerId()); + } + }); + return containersLogMeta; + } + + private Map filterFiles( + List fileList, final String suffix) throws IOException { + Map checkSumFiles = new HashMap<>(); + Set status = new HashSet(fileList); + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName().endsWith( + suffix); + } + }); + status = Sets.newHashSet(mask); + for (FileStatus file : status) { + checkSumFiles.put(file.getPath().getName(), file); + } + return checkSumFiles; + } + + private List getNodeLogFileToRead( + List nodeFiles, String nodeId, ApplicationId appId) + throws IOException { + List listOfFiles = new ArrayList<>(); + for (FileStatus file : nodeFiles) { + String nodeName = file.getPath().getName(); + if ((nodeId == null || nodeId.isEmpty() + || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX) && + !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); + nodeFiles = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); + continue; + } + listOfFiles.add(file); + } + } + return listOfFiles; + } + + private FileStatus getFilteredFiles(Map fileMap, + String fileName, String suffix) { + for (String file : fileMap.keySet()) { + if (file.startsWith(fileName) && file.endsWith(suffix)) { + return fileMap.get(file); + } + } + return null; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + IndexedFileAggregatedLogsBlock block = new IndexedFileAggregatedLogsBlock( + context); + block.render(html); + } + + @Override + public String getApplicationOwner(Path aggregatedLogPath) + throws IOException { + if (this.cachedIndexedLogsMeta == null + || !this.cachedIndexedLogsMeta.getRemoteLogPath() + .equals(aggregatedLogPath)) { + this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( + loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + } + if (this.cachedIndexedLogsMeta != null) { + return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser(); + } + return null; + } + + @Override + public Map getApplicationAcls( + Path aggregatedLogPath) throws IOException { + if (this.cachedIndexedLogsMeta == null + || !this.cachedIndexedLogsMeta.getRemoteLogPath() + .equals(aggregatedLogPath)) { + this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( + loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + } + if (this.cachedIndexedLogsMeta != null) { + return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls(); + } + return null; + } + + @Override + public Path getRemoteAppLogDir(ApplicationId appId, String user) + throws IOException { + return LogAggregationUtils.getRemoteAppLogDir(conf, appId, user, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + + private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) + throws IOException { + FileContext fileContext = + FileContext.getFileContext(remoteLogPath.toUri(), conf); + FSDataInputStream fsDataIStream = null; + try { + fsDataIStream = fileContext.open(remoteLogPath); + if (end == 0) { + return null; + } + long fileLength = end < 0 ? fileContext.getFileStatus( + remoteLogPath).getLen() : end; + fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE); + int offset = fsDataIStream.readInt(); + byte[] array = new byte[offset]; + fsDataIStream.seek(fileLength - offset - Integer.SIZE/ Byte.SIZE); + fsDataIStream.read(array); + return (IndexedLogsMeta)SerializationUtils + .deserialize(array); + } finally { + IOUtils.closeQuietly(fsDataIStream); + } + } + + private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath) + throws IOException { + return loadIndexedLogsMeta(remoteLogPath, -1); + } + + private long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath) + throws IOException { + FileContext fileContext = + FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf); + FSDataInputStream fsDataIStream = null; + try { + fsDataIStream = fileContext.open(remoteLogCheckSumPath); + return fsDataIStream.readLong(); + } finally { + IOUtils.closeQuietly(fsDataIStream); + } + } + + @Private + @VisibleForTesting + public static class IndexedLogsMeta implements Serializable { + + private static final long serialVersionUID = 5439875373L; + private int version; + private String user; + private String compressName; + private Map acls; + private String nodeId; + private String uuid; + private List logMetas = new ArrayList<>(); + + public int getVersion() { + return this.version; + } + + public void setVersion(int version) { + this.version = version; + } + + public String getUser() { + return this.user; + } + + public void setUser(String user) { + this.user = user; + } + + public Map getAcls() { + return this.acls; + } + + public void setAcls(Map acls) { + this.acls = acls; + } + + public String getCompressName() { + return compressName; + } + + public void setCompressName(String compressName) { + this.compressName = compressName; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public void addLogMeta(LogMeta logMeta) { + logMetas.add(logMeta); + } + + public List getLogMetas() { + return logMetas; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + } + + public static class LogMeta implements Serializable { + private static final long serialVersionUID = 3929298383L; + private String remoteNodeLogFileName; + private Map> logMetas = new HashMap<>(); + private long uploadTimeStamp; + + public String getRemoteNodeFile() { + return remoteNodeLogFileName; + } + public void setRemoteNodeFile(String remoteNodeLogFileName) { + this.remoteNodeLogFileName = remoteNodeLogFileName; + } + + public void addContainerLogMeta(String containerId, + List logMeta) { + logMetas.put(containerId, logMeta); + } + + public List getContainerLogMeta(String containerId) { + return logMetas.get(containerId); + } + + public Map> getLogMetas() { + return logMetas; + } + + public long getUploadTimeStamp() { + return uploadTimeStamp; + } + + public void setUploadTimeStamp(long uploadTimeStamp) { + this.uploadTimeStamp = uploadTimeStamp; + } + } + + @Private + @VisibleForTesting + public static class FileLogMeta implements Serializable { + private static final long serialVersionUID = 1L; + private String containerId; + private String fileName; + private long fileSize; + private long fileCompressedSize; + private long lastModificatedTime; + private long startIndex; + + public String getFileName() { + return fileName; + } + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public long getFileSize() { + return fileSize; + } + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public long getFileCompressedSize() { + return fileCompressedSize; + } + public void setFileCompressedSize(long fileCompressedSize) { + this.fileCompressedSize = fileCompressedSize; + } + + public long getLastModificatedTime() { + return lastModificatedTime; + } + public void setLastModificatedTime(long lastModificatedTime) { + this.lastModificatedTime = lastModificatedTime; + } + + public long getStartIndex() { + return startIndex; + } + public void setStartIndex(long startIndex) { + this.startIndex = startIndex; + } + + public String getContainerId() { + return containerId; + } + public void setContainerId(String containerId) { + this.containerId = containerId; + } + } + + private static String logErrorMessage(File logFile, Exception e) { + String message = "Error aggregating log file. Log file : " + + logFile.getAbsolutePath() + ". " + e.getMessage(); + LOG.error(message, e); + return message; + } + + private static class IndexedFileOutputStreamState { + private final Algorithm compressAlgo; + private Compressor compressor; + private final FSDataOutputStream fsOut; + private long posStart; + private final SimpleBufferedOutputStream fsBufferedOutput; + private OutputStream out; + private long offset; + + public IndexedFileOutputStreamState(Algorithm compressionName, + FSDataOutputStream fsOut, Configuration conf, long offset) throws IOException { + this.compressAlgo = compressionName; + this.fsOut = fsOut; + this.offset = offset; + this.posStart = fsOut.getPos(); + + BytesWritable fsOutputBuffer = new BytesWritable(); + fsOutputBuffer.setCapacity(LogAggregationIndexedFileController + .getFSOutputBufferSize(conf)); + + this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, + fsOutputBuffer.getBytes()); + + this.compressor = compressAlgo.getCompressor(); + + try { + this.out = compressAlgo.createCompressionStream( + fsBufferedOutput, compressor, 0); + } catch (IOException e) { + compressAlgo.returnCompressor(compressor); + throw e; + } + } + + OutputStream getOutputStream() { + return out; + } + + long getCurrentPos() throws IOException { + return fsOut.getPos() + fsBufferedOutput.size(); + } + + long getStartPos() { + return posStart + offset; + } + + long getCompressedSize() throws IOException { + long ret = getCurrentPos() - posStart; + return ret; + } + + void finish() throws IOException { + try { + if (out != null) { + out.flush(); + out = null; + } + } finally { + compressAlgo.returnCompressor(compressor); + compressor = null; + } + } + } + + private static class CachedIndexedLogsMeta { + private final Path remoteLogPath; + private final IndexedLogsMeta indexedLogsMeta; + public CachedIndexedLogsMeta(IndexedLogsMeta indexedLogsMeta, + Path remoteLogPath) { + this.indexedLogsMeta = indexedLogsMeta; + this.remoteLogPath = remoteLogPath; + } + + public Path getRemoteLogPath() { + return this.remoteLogPath; + } + + public IndexedLogsMeta getCachedIndexedLogsMeta() { + return this.indexedLogsMeta; + } + } + + static int getFSOutputBufferSize(Configuration conf) { + return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); + } + + static int getFSInputBufferSize(Configuration conf) { + return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); + } + + private abstract class FSAction { + abstract T run() throws Exception; + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (IOException e) { + LOG.info("Exception while executing an FS operation.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out FS retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on FS. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + } + + private class IndexedFileAggregatedLogsBlock extends HtmlBlock { + + @Inject + public IndexedFileAggregatedLogsBlock(ViewContext ctx) { + super(ctx); + } + + @Override + protected void render(Block html) { + ContainerId containerId = LogAggregationWebUtils + .verifyAndGetContainerId(html, $(CONTAINER_ID)); + NodeId nodeId = LogAggregationWebUtils + .verifyAndGetNodeId(html, $(NM_NODENAME)); + String appOwner = LogAggregationWebUtils + .verifyAndGetAppOwner(html, $(APP_OWNER)); + + boolean isValid = true; + long start = -4096; + try { + start = LogAggregationWebUtils.verifyAndGetLogStartIndex( + html, $("start")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("start")).__(); + isValid = false; + } + + long end = Long.MAX_VALUE; + try { + end = LogAggregationWebUtils.verifyAndGetLogEndIndex( + html, $("end")); + }catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("end")).__(); + isValid = false; + } + + if (containerId == null || nodeId == null || appOwner == null + || appOwner.isEmpty() || !isValid) { + return; + } + + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + String logEntity = $(ENTITY_STRING); + if (logEntity == null || logEntity.isEmpty()) { + logEntity = containerId.toString(); + } + + List nodeFiles = new ArrayList<>(); + try { + nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, appOwner, + remoteRootLogDir, remoteRootLogDirSuffix); + } catch(Exception ex) { + html.h1("Unable to locate any logs for container " + + containerId.toString()); + LOG.error(ex.getMessage()); + return; + } + + Map checkSumFiles; + try { + checkSumFiles = filterFiles(nodeFiles, CHECK_SUM_FILE_SUFFIX); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + html.h1("Error getting logs for " + logEntity); + return; + } + + List fileToRead; + try { + fileToRead = getNodeLogFileToRead(nodeFiles, nodeId.toString(), appId); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + html.h1("Error getting logs for " + logEntity); + return; + } + + boolean foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + for (FileStatus thisNodeFile : fileToRead) { + FileStatus checkSum = getFilteredFiles(checkSumFiles, + thisNodeFile.getPath().getName(), CHECK_SUM_FILE_SUFFIX); + long endIndex = -1; + if (checkSum != null) { + endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + } + IndexedLogsMeta indexedLogsMeta = null; + try { + indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(), + endIndex); + } catch (Exception ex) { + // DO NOTHING + LOG.warn("Can not load log meta from the log file:" + + thisNodeFile.getPath()); + continue; + } + if (indexedLogsMeta == null) { + continue; + } + Map appAcls = null; + appAcls = indexedLogsMeta.getAcls(); + ApplicationACLsManager aclsManager = new ApplicationACLsManager( + conf); + aclsManager.addApplication(appId, appAcls); + + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, remoteUser, appId)) { + html.h1().__("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + thisNodeFile.getPath().getName() + "]") + .__(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + continue; + } + String compressAlgo = indexedLogsMeta.getCompressName(); + List candidates = new ArrayList<>(); + for (LogMeta logMeta : indexedLogsMeta.getLogMetas()) { + for (Entry> meta : logMeta.getLogMetas() + .entrySet()) { + for (FileLogMeta log : meta.getValue()) { + if (!log.getContainerId().equals(containerId.toString())) { + continue; + } + if (desiredLogType != null && !desiredLogType.isEmpty() + && !desiredLogType.equals(log.getFileName())) { + continue; + } + candidates.add(log); + } + } + } + if (candidates.isEmpty()) { + continue; + } + Collections.sort(candidates, new Comparator() { + @Override + public int compare(FileLogMeta o1, FileLogMeta o2) { + int compare = o1.getContainerId().compareTo(o2.getContainerId()); + if (compare == 0) { + long time = o1.getLastModificatedTime() + - o2.getLastModificatedTime(); + if (time > 0) { + return 1; + } else if (time < 0) { + return -1; + } + return 0; + } + return compare; + } + }); + + Algorithm compressName = Compression.getCompressionAlgorithmByName( + compressAlgo); + Decompressor decompressor = compressName.getDecompressor(); + FileContext fileContext = FileContext.getFileContext( + thisNodeFile.getPath().toUri(), conf); + FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); + int bufferSize = 65536; + for (FileLogMeta candidate : candidates) { + byte[] cbuf = new byte[bufferSize]; + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, + candidate.getStartIndex(), + candidate.getFileCompressedSize()), + decompressor, getFSInputBufferSize(conf)); + long logLength = candidate.getFileSize(); + html.pre().__("\n\n").__(); + html.p().__("Log Type: " + candidate.getFileName()).__(); + html.p().__("Log Upload Time: " + Times.format( + candidate.getLastModificatedTime())).__(); + html.p().__("Log Length: " + Long.toString( + logLength)).__(); + long startIndex = start < 0 + ? logLength + start : start; + startIndex = startIndex < 0 ? 0 : startIndex; + startIndex = startIndex > logLength ? logLength : startIndex; + long endLogIndex = end < 0 + ? logLength + end : end; + endLogIndex = endLogIndex < 0 ? 0 : endLogIndex; + endLogIndex = endLogIndex > logLength ? logLength : endLogIndex; + endLogIndex = endLogIndex < startIndex ? + startIndex : endLogIndex; + long toRead = endLogIndex - startIndex; + if (toRead < logLength) { + html.p().__("Showing " + toRead + " bytes of " + logLength + + " total. Click ").a(url("logs", $(NM_NODENAME), + $(CONTAINER_ID),$(ENTITY_STRING), $(APP_OWNER), + candidate.getFileName(), "?start=0"), "here"). + __(" for the full log.").__(); + } + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = in.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = in.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException( "Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = in.read(cbuf, 0, currentToRead)) > 0) { + pre.__(new String(cbuf, 0, len)); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre.__(); + foundLog = true; + } catch (Exception ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + IOUtils.closeQuietly(in); + } + } + } + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + } + } catch (Exception ex) { + html.h1().__("Error getting logs for " + logEntity).__(); + LOG.error("Error getting logs for " + logEntity, ex); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationIndexFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationIndexFileController.java new file mode 100644 index 0000000..c2d5716 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationIndexFileController.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestLogAggregationIndexFileController { + + private final String rootLocalLogDir = "target/LocalLogs"; + private final Path rootLocalLogDirPath = new Path(rootLocalLogDir); + private final String remoteLogDir = "target/remote-app"; + private final FsPermission LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0777)); + UserGroupInformation userUgi = UserGroupInformation + .createRemoteUser("testUser"); + private FileSystem fs; + private Configuration conf; + private ApplicationId appId; + private ContainerId containerId; + private NodeId nodeId; + + ByteArrayOutputStream sysOutStream; + private PrintStream sysOut; + + ByteArrayOutputStream sysErrStream; + private PrintStream sysErr; + + @Before + public void setUp() throws IOException { + appId = ApplicationId.newInstance(123456, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + appId, 1); + containerId = ContainerId.newContainerId(attemptId, 1); + nodeId = NodeId.newInstance("localhost", 9999); + conf = new Configuration(); + conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir", + remoteLogDir); + conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix", + "logs"); + conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz"); + fs = FileSystem.get(conf); + sysOutStream = new ByteArrayOutputStream(); + sysOut = new PrintStream(sysOutStream); + System.setOut(sysOut); + + sysErrStream = new ByteArrayOutputStream(); + sysErr = new PrintStream(sysErrStream); + System.setErr(sysErr); + } + + @After + public void teardown() throws Exception { + fs.delete(rootLocalLogDirPath, true); + fs.delete(new Path(remoteLogDir), true); + } + + @Test(timeout = 15000) + public void testLogAggregationIndexFileFormat() throws Exception { + if (fs.exists(rootLocalLogDirPath)) { + fs.delete(rootLocalLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLocalLogDirPath)); + + Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + List logTypes = new ArrayList(); + logTypes.add("syslog"); + logTypes.add("stdout"); + logTypes.add("stderr"); + + Set files = new HashSet<>(); + + LogKey key1 = new LogKey(containerId.toString()); + + for(String logType : logTypes) { + File file = createAndWriteLocalLogFile(containerId, appLogsDir, + logType); + files.add(file); + } + LogValue value = mock(LogValue.class); + when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); + + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + fileFormat.initialize(conf, "Indexed"); + + Map appAcls = new HashMap<>(); + Path appDir = fileFormat.getRemoteAppLogDir(appId, + userUgi.getShortUserName()); + if (fs.exists(appDir)) { + fs.delete(appDir, true); + } + assertTrue(fs.mkdirs(appDir)); + + Path logPath = fileFormat.getRemoteNodeLogFileForApp( + appId, userUgi.getShortUserName(), nodeId); + LogAggregationFileControllerContext context = new LogAggregationFileControllerContext( + logPath, logPath, true, 1000, appId, appAcls, nodeId, userUgi); + // initialize the writer + fileFormat.initializeWriter(context); + + fileFormat.write(key1, value); + LogAggregationFileControllerContext record = mock( + LogAggregationFileControllerContext.class); + fileFormat.postWrite(record); + fileFormat.closeWriter(); + + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(userUgi.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + List meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertTrue(meta.size() == 1); + List fileNames = new ArrayList<>(); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + Assert.assertTrue(log.getContainerLogMeta().size() == 3); + for (PerContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(logTypes); + Assert.assertTrue(fileNames.isEmpty()); + + boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : logTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + + // create a checksum file + Path checksumFile = new Path(fileFormat.getRemoteAppLogDir( + appId, userUgi.getShortUserName()), + LogAggregationUtils.getNodeString(nodeId) + + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); + FSDataOutputStream fInput = null; + try { + fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK); + fInput.writeLong(0); + } finally { + IOUtils.closeQuietly(fInput); + } + meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertTrue(meta.size() == 0); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertFalse(foundLogs); + sysOutStream.reset(); + fs.delete(checksumFile, false); + Assert.assertFalse(fs.exists(checksumFile)); + + List newLogTypes = new ArrayList<>(logTypes); + files.clear(); + newLogTypes.add("test1"); + files.add(createAndWriteLocalLogFile(containerId, appLogsDir, + "test1")); + newLogTypes.add("test2"); + files.add(createAndWriteLocalLogFile(containerId, appLogsDir, + "test2")); + LogValue value2 = mock(LogValue.class); + when(value2.getPendingLogFilesToUploadForThisContainer()) + .thenReturn(files); + + // initialize the writer + fileFormat.initializeWriter(context); + fileFormat.write(key1, value2); + fileFormat.closeWriter(); + + // We did not call postWriter which we would keep the checksum file. + // We can only get the logs/logmeta from the first write. + fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), meta.size(), 1); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + Assert.assertTrue(log.getContainerLogMeta().size() == 3); + for (PerContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(logTypes); + Assert.assertTrue(fileNames.isEmpty()); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : logTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + Assert.assertFalse(sysOutStream.toString().contains(logMessage( + containerId, "test1"))); + Assert.assertFalse(sysOutStream.toString().contains(logMessage( + containerId, "test2"))); + sysOutStream.reset(); + + // Call postWrite and we should get all logs/logmetas for both + // first write and second write + fileFormat.initializeWriter(context); + fileFormat.write(key1, value2); + fileFormat.postWrite(record); + fileFormat.closeWriter(); + fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), meta.size(), 2); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (PerContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + } + + private File createAndWriteLocalLogFile(ContainerId containerId, + Path localLogDir, String logType) throws IOException { + File file = new File(localLogDir.toString(), logType); + if (file.exists()) { + file.delete(); + } + file.createNewFile(); + Writer writer = null; + try { + writer = new FileWriter(file); + writer.write(logMessage(containerId, logType)); + writer.close(); + return file; + } finally { + IOUtils.closeQuietly(writer); + } + } + + private String logMessage(ContainerId containerId, String logType) { + StringBuilder sb = new StringBuilder(); + sb.append("Hello " + containerId + " in " + logType + "!"); + return sb.toString(); + } +}