diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogCompactionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogCompactionService.java new file mode 100644 index 0000000..ce1d33a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogCompactionService.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * A service that periodically compacts aggregated logs. + */ +@InterfaceAudience.LimitedPrivate({"yarn"}) +public class AggregatedLogCompactionService extends AbstractService { + + // TODO: look at AggregatedLogDeletionService to do periodically (or triggered by something?) + public AggregatedLogCompactionService() { + super(AggregatedLogCompactionService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + // TODO: exceptions, finally, etc + private void compact(final Configuration conf, final Path sourceDir, Path compactedLogDir, UserGroupInformation userUgi) throws IOException, InterruptedException { + if (sourceDir.equals(compactedLogDir)) { + throw new RuntimeException("TODO"); + } + + FileSystem fs = userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return sourceDir.getFileSystem(conf); + } + }); + if (!fs.isDirectory(sourceDir)) { + throw new RuntimeException("TODO"); + } + + Path tmpIndexFile = new Path(compactedLogDir, sourceDir.getName() + ".index.tmp"); + Path logFile = new Path(compactedLogDir, sourceDir.getName()); + CompactedAggregatedLogFormat.LogWriter writer = new CompactedAggregatedLogFormat.LogWriter(conf, tmpIndexFile, logFile, userUgi); + for (FileStatus file : fs.listStatus(sourceDir)) { + AggregatedLogFormat.LogReader inLogReader = new AggregatedLogFormat.LogReader(conf, file.getPath()); + writer.append(inLogReader); + } + writer.close(); + + Path indexFile = new Path(compactedLogDir, sourceDir.getName() + ".index"); + fs.rename(tmpIndexFile, indexFile); + fs.delete(sourceDir, true); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java new file mode 100644 index 0000000..4784fca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CompactedAggregatedLogFormat { + + /** + * Umask for the log file. + */ + private static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + + public CompactedAggregatedLogFormat() { + } + + public static class LogWriter { + + private boolean wroteMeta; + + private final FSDataOutputStream fsDataOStream; + private final FSDataOutputStream indexOStream; + private FileContext fc; + private long currentOffset; + + public LogWriter(final Configuration conf, final Path indexFile, + final Path logFile, final UserGroupInformation userUgi) + throws IOException { + wroteMeta = false; + currentOffset = 0l; + + try { + this.fsDataOStream = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FSDataOutputStream run() throws Exception { + fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc.create( + logFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + + try { + this.indexOStream = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FSDataOutputStream run() throws Exception { + fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc.create( + indexFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void writeApplicationACLs(Map appAcls) + throws IOException { + indexOStream.writeInt(appAcls.size()); + for (Map.Entry entry : appAcls.entrySet()) { + indexOStream.writeUTF(entry.getKey().toString()); + indexOStream.writeUTF(entry.getValue()); + } + } + + // TODO: finally to close the streams/readers/writers/etc + public void append(AggregatedLogFormat.LogReader reader) throws IOException { + if (!wroteMeta) { + writeApplicationACLs(reader.getApplicationAcls()); + indexOStream.writeUTF(reader.getApplicationOwner()); + wroteMeta = true; + } + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + DataInputStream valueStream = reader.next(key); + while (valueStream != null) { + indexOStream.writeUTF(key.toString()); + indexOStream.writeLong(currentOffset); + long length = IOUtils.copyLarge(valueStream, fsDataOStream); + indexOStream.writeLong(length); + currentOffset += length; + + // Next container + key = new AggregatedLogFormat.LogKey(); + valueStream = reader.next(key); + } + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(indexOStream); + org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream); + } + } + + public static class LogReader { + + private final FSDataInputStream fsDataIStream; + private Map acls; + private String owner; + private Map index; + + public LogReader(Configuration conf, Path indexFile, Path logFile) throws IOException { + FileContext fileContext = FileContext.getFileContext(conf); + + FSDataInputStream indexIStream = fileContext.open(indexFile); + loadIndexFile(indexIStream); + org.apache.hadoop.io.IOUtils.closeStream(indexIStream); + + this.fsDataIStream = fileContext.open(logFile); + } + + public Map getApplicationAcls() { + return acls; + } + + public String getApplicationOwner() { + return owner; + } + + private void readApplicationACLs(FSDataInputStream indexIStream) throws IOException { + int numAcls = indexIStream.readInt(); + acls = new HashMap(numAcls); + for (int i = 0; i < numAcls; i++) { + String appAccessOp = null; + String aclString = null; + try { + appAccessOp = indexIStream.readUTF(); + } catch (EOFException e) { + // Valid end of stream. + break; + } + try { + aclString = indexIStream.readUTF(); + } catch (EOFException e) { + throw new YarnRuntimeException("Error reading ACLs", e); + } + acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); + } + } + + private void loadIndexFile(FSDataInputStream indexIStream) throws IOException { + readApplicationACLs(indexIStream); + owner = indexIStream.readUTF(); + + index = new HashMap(); + while (true) { + String containerId = null; + long offset; + long length; + try { + containerId = indexIStream.readUTF(); + } catch (EOFException e) { + // Valid end of stream. + break; + } + try { + offset = indexIStream.readLong(); + } catch (EOFException e) { + throw new YarnRuntimeException("Error reading index", e); + } + try { + length = indexIStream.readLong(); + } catch (EOFException e) { + throw new YarnRuntimeException("Error reading index", e); + } + index.put(containerId, new IndexEntry(offset, length)); + } + } + + @VisibleForTesting + public Set getContainerIds() { + return index.keySet(); + } + + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + @InterfaceAudience.Private + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + AggregatedLogFormat.ContainerLogsReader logReader = null; + IndexEntry iEntry = index.get(containerId.toString()); + if (iEntry != null) { + fsDataIStream.seek(iEntry.getOffset()); + BoundedInputStream boundedFsDataIStream = new BoundedInputStream(fsDataIStream, iEntry.getLength()); + DataInputStream valueStream = new DataInputStream(boundedFsDataIStream); + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + } + return logReader; + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream); + } + } + + private static class IndexEntry { + private long offset; + private long length; + + public IndexEntry(long offset, long length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } +}