From 8bba04de170fa1fc7c1c0797940c828f8607744e Mon Sep 17 00:00:00 2001 From: rmani Date: Fri, 26 Jan 2018 17:56:43 -0800 Subject: [PATCH 1/1] RANGER-1837: Enhance Ranger Audit to HDFS to support ORC file format --- agents-audit/README.txt | 88 ++ agents-audit/pom.xml | 5 + .../ranger/audit/destination/AuditDestination.java | 1 - .../audit/destination/HDFSAuditDestination.java | 354 ++----- .../audit/provider/AuditFileCacheProvider.java | 2 +- .../apache/ranger/audit/provider/AuditHandler.java | 5 +- .../audit/provider/AuditProviderFactory.java | 31 +- .../ranger/audit/provider/AuditWriterFactory.java | 129 +++ .../ranger/audit/provider/BaseAuditHandler.java | 12 +- .../ranger/audit/provider/DummyAuditProvider.java | 18 + .../org/apache/ranger/audit/provider/MiscUtil.java | 14 + .../audit/queue/AuditFileCacheProviderSpool.java | 9 +- .../apache/ranger/audit/queue/AuditFileQueue.java | 125 +++ .../ranger/audit/queue/AuditFileQueueSpool.java | 1019 ++++++++++++++++++++ .../ranger/audit/utils/AbstractAuditWriter.java | 352 +++++++ .../org/apache/ranger/audit/utils/JSONWriter.java | 173 ++++ .../org/apache/ranger/audit/utils/ORCFileUtil.java | 454 +++++++++ .../org/apache/ranger/audit/utils/ORCWriter.java | 190 ++++ .../java/org/apache/ranger/audit/utils/Writer.java | 41 + pom.xml | 2 +- src/main/assembly/hbase-agent.xml | 1 + src/main/assembly/hdfs-agent.xml | 1 + src/main/assembly/hive-agent.xml | 1 + src/main/assembly/knox-agent.xml | 1 + src/main/assembly/plugin-atlas.xml | 1 + src/main/assembly/plugin-kafka.xml | 1 + src/main/assembly/plugin-kms.xml | 1 + src/main/assembly/plugin-solr.xml | 1 + src/main/assembly/plugin-sqoop.xml | 1 + src/main/assembly/plugin-yarn.xml | 1 + src/main/assembly/storm-agent.xml | 1 + 31 files changed, 2762 insertions(+), 273 deletions(-) create mode 100644 agents-audit/README.txt create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueue.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueueSpool.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractAuditWriter.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/utils/JSONWriter.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCWriter.java create mode 100644 agents-audit/src/main/java/org/apache/ranger/audit/utils/Writer.java diff --git a/agents-audit/README.txt b/agents-audit/README.txt new file mode 100644 index 0000000..e5787c3 --- /dev/null +++ b/agents-audit/README.txt @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ORC FILE FORMAT in HDFS Ranger Audig log: + + 1. Enable Ranger Audit to HDFS in ORC file format + - To enable Ranger Audit to HDFS with ORC format, we need to first enable AuditFileCacheProvider to spool the audit to local first. + This is done to create batch for the ORC file. + * For each of the Hadoop component ranger plugin, create the spool directory. Component -> hdfs,hive,hbase,storm,kafka,solr,nifi,etc. + should have the necessary read/write/execute permission to the folder + : This will be the service owner who starts the service in the cluster. + i.e Service "Namenode" is started by "hdfs", Service "hiveserver2" is started by "hive" ( if started by custom user they have to have the permission). + + $ mkdir -p /var/log/hadoop/ /audit/spool + $ cd /var/log/hadoop//audit/ + $ chown :hadoop spool + + e.g for hive ranger plugging. + $ mkdir -p /var/log/hive/audit/spool + $ cd /var/log/hadoop/hive/audit/ + $ chown hive:hadoop spool + + * Enable AuditFileCacheProvider via following params in ranger--audit.xml + xasecure.audit.provider.filecache.is.enabled=true + xasecure.audit.provider.filecache.filespool.file.rollover.sec=3600 ( 1 hr batch will be created to create ORC file/files based on size) + xasecure.audit.provider.filecache.filespool.dir=/var/log/hadoop/hdfs/audit/spool + + 2. Enable ORC fileformat for Ranger HDFS Audit. + - This is done by having the following param in ranger--audit.xml. By default the value is "json" + + xasecure.audit.destination.hdfs.filetype=orc + + 3. Provision to control the compression techniques for ORC format. Default is 'snappy' + xasecure.audit.destination.hdfs.orc.compression=snappy|lzo|zlip|none + + 4. Buffer Size and Stripe Size of ORC file batch. Default is '100000' bytes and '1000000' bytes respectively. This will decide the batch size on ORC file in hdfs. + xasecure.audit.destination.hdfs.orc.buffersize= (value in bytes) + xasecure.audit.destination.hdfs.orc.stripesize= (value in bytes) + + 5. Hive Query to create ORC table with default 'snappy' compresssion. + + CREATE EXTERNAL TABLE ranger_audit_event ( + repositoryType int, + repositoryName string, + reqUser string, + evtTime string, + accessType string, + resourcePath string, + resourceType string, + action string, + accessResult string, + agentId string, + policyId bigint, + resultReason string, + aclEnforcer string, + sessionId string, + clientType string, + clientIP string, + requestData string, + agentHostname string, + logType string, + eventId string, + seqNum bigint, + eventCount bigint, + eventDurationMS bigint, + additionalInfo string, + clusterName string + ) + STORED AS ORC + LOCATION '/ranger/audit/hdfs' + TBLPROPERTIES ("orc.compress"="SNAPPY"); + + + + + diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml index c8bd1d8..8ce5e4b 100644 --- a/agents-audit/pom.xml +++ b/agents-audit/pom.xml @@ -57,6 +57,11 @@ ${javax.persistence.version} + org.apache.hive + hive-exec + ${hive.version} + + log4j log4j ${log4j.version} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java index 41d0e82..72d2178 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java @@ -77,5 +77,4 @@ public abstract class AuditDestination extends BaseAuditHandler { public void waitToComplete(long timeout) { } - } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java index 66d8504..042d65b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java @@ -19,113 +19,45 @@ package org.apache.ranger.audit.destination; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.*; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditWriterFactory; import org.apache.ranger.audit.provider.MiscUtil; -import org.apache.ranger.audit.utils.RollingTimeUtil; +import org.apache.ranger.audit.utils.Writer; /** - * This class write the logs to local file + * This class write the logs to Hdfs */ -public class HDFSAuditDestination extends AuditDestination { + public class HDFSAuditDestination extends AuditDestination { private static final Log logger = LogFactory .getLog(HDFSAuditDestination.class); - public static final String PROP_HDFS_DIR = "dir"; - public static final String PROP_HDFS_SUBDIR = "subdir"; - public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format"; - public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec"; - public static final String PROP_HDFS_ROLLOVER_PERIOD = "file.rollover.period"; - - int fileRolloverSec = 24 * 60 * 60; // In seconds - - private String logFileNameFormat; - - private String rolloverPeriod; - - boolean initDone = false; - - private String logFolder; - - private PrintWriter logWriter = null; - volatile FSDataOutputStream ostream = null; // output stream wrapped in logWriter - - private String currentFileName; - - private boolean isStopped = false; - - private RollingTimeUtil rollingTimeUtil = null; - - private Date nextRollOverTime = null; - - private boolean rollOverByDuration = false; + private static final String DEFAULT_AUDIT_FILE_TYPE = "json"; + private Map auditConfigs = null; + private String auditProviderName = null; + private Writer writer = null; + private boolean initDone = false; + private boolean isStopped = false; @Override public void init(Properties prop, String propPrefix) { super.init(prop, propPrefix); - - // Initialize properties for this class - // Initial folder and file properties - String logFolderProp = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_HDFS_DIR); - if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.fatal("File destination folder is not configured. Please set " - + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName()); - return; - } - - String logSubFolder = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_HDFS_SUBDIR); - if (logSubFolder == null || logSubFolder.isEmpty()) { - logSubFolder = "%app-type%/%time:yyyyMMdd%"; - } - - logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." - + PROP_HDFS_FILE_NAME_FORMAT); - fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_HDFS_ROLLOVER, fileRolloverSec); - - if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { - logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log"; - } - - logFolder = logFolderProp + "/" + logSubFolder; - logger.info("logFolder=" + logFolder + ", destName=" + getName()); - logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" - + getName()); - logger.info("config=" + configProps.toString()); - - rolloverPeriod = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_HDFS_ROLLOVER_PERIOD); - rollingTimeUtil = RollingTimeUtil.getInstance(); - - //file.rollover.period is used for rolling over. If it could compute the next roll over time using file.rollover.period - //it fall back to use file.rollover.sec for find next rollover time. If still couldn't find default will be 1day window - //for rollover. - if(StringUtils.isEmpty(rolloverPeriod) ) { - rolloverPeriod = rollingTimeUtil.convertRolloverSecondsToRolloverPeriod(fileRolloverSec); - } - + this.auditProviderName = getName(); + this.auditConfigs = configProps; try { - nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod); - } catch ( Exception e) { - logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for hdfs audit file rollover...",e); - rollOverByDuration = true; - nextRollOverTime = rollOverByDuration(); + this.writer = getWriter(); + initDone = true; + } catch (Exception e) { + logger.error("Error while getting Audit writer", e); } - initDone = true; } @Override @@ -142,33 +74,10 @@ public class HDFSAuditDestination extends AuditDestination { logError("log() called after stop was requested. name=" + getName()); return false; } - - PrintWriter out = null; try { - if (logger.isDebugEnabled()) { - logger.debug("UGI=" + MiscUtil.getUGILoginUser() - + ". Will write to HDFS file=" + currentFileName); - } - - out = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction() { - @Override - public PrintWriter run() throws Exception { - PrintWriter out = getLogFileStream(); - for (String event : events) { - out.println(event); - } - return out; - }; - }); - - // flush and check the stream for errors - if (out.checkError()) { - // In theory, this count may NOT be accurate as part of the messages may have been successfully written. - // However, in practice, since client does buffering, either all of none would succeed. + boolean ret = writer.log(events); + if (!ret) { addDeferredCount(events.size()); - out.close(); - logWriter = null; - ostream = null; return false; } } catch (Throwable t) { @@ -177,40 +86,67 @@ public class HDFSAuditDestination extends AuditDestination { return false; } finally { logger.info("Flushing HDFS audit. Event Size:" + events.size()); - if (out != null) { - out.flush(); + if (writer != null) { + flush(); } } addSuccessCount(events.size()); return true; } + @Override - public void flush() { - logger.info("Flush called. name=" + getName()); - if (ostream != null) { - try { - synchronized (this) { - if (ostream != null) - // 1) PrinterWriter does not have bufferring of its own so - // we need to flush its underlying stream - // 2) HDFS flush() does not really flush all the way to disk. - ostream.hflush(); - logger.info("Flush HDFS audit logs completed....."); - } - } catch (IOException e) { - logger.error("Error on flushing log writer: " + e.getMessage() + - "\nException will be ignored. name=" + getName() + ", fileName=" + currentFileName); + synchronized public boolean logFile(final File file) { + logStatusIfRequired(); + //addTotalCount(events.size()); + + if (!initDone) { + //addDeferredCount(events.size()); + return false; + } + if (isStopped) { + //addDeferredCount(events.size()); + logError("log() called after stop was requested. name=" + getName()); + return false; + } + + try { + boolean ret = writer.logFile(file); + if (!ret) { + //addDeferredCount(events.size()); + return false; + } + } catch (Throwable t) { + //addDeferredCount(events.size()); + logError("Error writing to log file.", t); + return false; + } finally { + logger.info("Flushing HDFS audit. File:" + file.getAbsolutePath() + file.getName()); + if (writer != null) { + flush(); } } + //addSuccessCount(events.size()); + return true; + } + + @Override + public void flush() { + logger.info("Flush called. name=" + getName()); + writer.flush(); + } + + @Override + public void waitToComplete() { + waitToComplete(); } /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) - */ + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) + */ @Override public boolean log(Collection events) { if (isStopped) { @@ -232,14 +168,13 @@ public class HDFSAuditDestination extends AuditDestination { } } return logJSON(jsonList); - } /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#start() - */ + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ @Override public void start() { // Nothing to do here. We will open the file when the first log request @@ -247,134 +182,23 @@ public class HDFSAuditDestination extends AuditDestination { } @Override - synchronized public void stop() { - isStopped = true; - if (logWriter != null) { - try { - logWriter.flush(); - logWriter.close(); - } catch (Throwable t) { - logger.error("Error on closing log writter. Exception will be ignored. name=" - + getName() + ", fileName=" + currentFileName); - } - logWriter = null; - ostream = null; - } - logStatus(); - } - - // Helper methods in this class - synchronized private PrintWriter getLogFileStream() throws Exception { - closeFileIfNeeded(); - - // Either there are no open log file or the previous one has been rolled - // over - if (logWriter == null) { - Date currentTime = new Date(); - // Create a new file - String fileName = MiscUtil.replaceTokens(logFileNameFormat, - currentTime.getTime()); - String parentFolder = MiscUtil.replaceTokens(logFolder, - currentTime.getTime()); - Configuration conf = createConfiguration(); - - String fullPath = parentFolder + Path.SEPARATOR + fileName; - String defaultPath = fullPath; - URI uri = URI.create(fullPath); - FileSystem fileSystem = FileSystem.get(uri, conf); - - Path hdfPath = new Path(fullPath); - logger.info("Checking whether log file exists. hdfPath=" + fullPath + ", UGI=" + MiscUtil.getUGILoginUser()); - int i = 0; - while (fileSystem.exists(hdfPath)) { - i++; - int lastDot = defaultPath.lastIndexOf('.'); - String baseName = defaultPath.substring(0, lastDot); - String extension = defaultPath.substring(lastDot); - fullPath = baseName + "." + i + extension; - hdfPath = new Path(fullPath); - logger.info("Checking whether log file exists. hdfPath=" - + fullPath); - } - logger.info("Log file doesn't exists. Will create and use it. hdfPath=" - + fullPath); - // Create parent folders - createParents(hdfPath, fileSystem); - - // Create the file to write - logger.info("Creating new log file. hdfPath=" + fullPath); - ostream = fileSystem.create(hdfPath); - logWriter = new PrintWriter(ostream); - currentFileName = fullPath; - } - return logWriter; - } - - Configuration createConfiguration() { - Configuration conf = new Configuration(); - for (Map.Entry entry : configProps.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - // for ease of install config file may contain properties with empty value, skip those - if (StringUtils.isNotEmpty(value)) { - conf.set(key, value); - } - logger.info("Adding property to HDFS config: " + key + " => " + value); - } - - logger.info("Returning HDFS Filesystem Config: " + conf.toString()); - return conf; + public String getAuditFileType() { + String auditFileType = MiscUtil.getStringProperty(props, propPrefix + ".filetype", DEFAULT_AUDIT_FILE_TYPE); + return auditFileType; } - private void createParents(Path pathLogfile, FileSystem fileSystem) - throws Exception { - logger.info("Creating parent folder for " + pathLogfile); - Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; - - if (parentPath != null && fileSystem != null - && !fileSystem.exists(parentPath)) { - fileSystem.mkdirs(parentPath); - } + @Override + synchronized public void stop() { + writer.stop(); + logStatus(); + isStopped = true; } - private void closeFileIfNeeded() throws FileNotFoundException, IOException { - if (logWriter == null) { - return; - } - if ( System.currentTimeMillis() > nextRollOverTime.getTime() ) { - logger.info("Closing file. Rolling over. name=" + getName() - + ", fileName=" + currentFileName); - try { - logWriter.flush(); - logWriter.close(); - } catch (Throwable t) { - logger.error("Error on closing log writter. Exception will be ignored. name=" - + getName() + ", fileName=" + currentFileName); - } - - logWriter = null; - ostream = null; - currentFileName = null; - - if (!rollOverByDuration) { - try { - if(StringUtils.isEmpty(rolloverPeriod) ) { - rolloverPeriod = rollingTimeUtil.convertRolloverSecondsToRolloverPeriod(fileRolloverSec); - } - nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod); - } catch ( Exception e) { - logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for hdfs audit file rollover...",e); - nextRollOverTime = rollOverByDuration(); - } - } else { - nextRollOverTime = rollOverByDuration(); - } - } - } - private Date rollOverByDuration() { - long rollOverTime = rollingTimeUtil.computeNextRollingTime(fileRolloverSec,nextRollOverTime); - return new Date(rollOverTime); + public Writer getWriter() throws Exception { + AuditWriterFactory auditWriterFactory = AuditWriterFactory.getInstance(); + auditWriterFactory.init(props, propPrefix, auditProviderName, auditConfigs); + return auditWriterFactory.getWriter(); } } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileCacheProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileCacheProvider.java index 314b130..37509b1 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileCacheProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileCacheProvider.java @@ -59,7 +59,7 @@ public class AuditFileCacheProvider extends BaseAuditHandler { boolean ret = false; if ( event != null) { fileSpooler.stashLogs(event); - if ( !fileSpooler.isSpoolingSuccessful()) { + if (fileSpooler.isSpoolingSuccessful()) { ret = true; } } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java index 4ce31dd..823a7f6 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java @@ -18,6 +18,7 @@ package org.apache.ranger.audit.provider; +import java.io.File; import java.util.Collection; import java.util.Properties; @@ -26,9 +27,9 @@ import org.apache.ranger.audit.model.AuditEventBase; public interface AuditHandler { boolean log(AuditEventBase event); boolean log(Collection events); - boolean logJSON(String event); boolean logJSON(Collection events); + boolean logFile(File file); void init(Properties prop); void init(Properties prop, String basePropertyName); @@ -42,5 +43,7 @@ public interface AuditHandler { */ String getName(); + String getAuditFileType(); + void flush(); } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 43107ba..8c890d9 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -38,6 +38,7 @@ import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider; import org.apache.ranger.audit.provider.solr.SolrAuditProvider; import org.apache.ranger.audit.queue.AuditAsyncQueue; import org.apache.ranger.audit.queue.AuditBatchQueue; +import org.apache.ranger.audit.queue.AuditFileQueue; import org.apache.ranger.audit.queue.AuditQueue; import org.apache.ranger.audit.queue.AuditSummaryQueue; @@ -62,6 +63,9 @@ public class AuditProviderFactory { public static final String AUDIT_DEST_BASE = "xasecure.audit.destination"; public static final String AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC = "xasecure.audit.shutdown.hook.max.wait.seconds"; public static final String AUDIT_IS_FILE_CACHE_PROVIDER_ENABLE_PROP = "xasecure.audit.provider.filecache.is.enabled"; + public static final String FILE_QUEUE_TYPE = "filequeue"; + public static final String DEFAULT_QUEUE_TYPE = "memoryqueue"; + public static final int AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT = 30; public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; @@ -451,7 +455,7 @@ public class AuditProviderFactory { } else if (providerName.equals("log4j")) { provider = new Log4JAuditDestination(); } else if (providerName.equals("batch")) { - provider = new AuditBatchQueue(consumer); + provider = getAuditProvider(props, propPrefix, consumer); } else if (providerName.equals("async")) { provider = new AuditAsyncQueue(consumer); } else { @@ -469,6 +473,31 @@ public class AuditProviderFactory { return provider; } + private AuditHandler getAuditProvider(Properties props, String propPrefix, + AuditHandler consumer) { + AuditHandler ret = null; + String queueType = MiscUtil.getStringProperty(props, propPrefix + "." + "queuetype", DEFAULT_QUEUE_TYPE); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> AuditProviderFactory.getAuditProvider() propPerfix= " + propPrefix + ", " + " queueType= " + queueType); + } + + if (FILE_QUEUE_TYPE.equalsIgnoreCase(queueType)) { + AuditFileQueue auditFileQueue = new AuditFileQueue(consumer); + String propPrefixFileQueue = propPrefix + "." + "filequeue"; + auditFileQueue.init(props, propPrefixFileQueue); + ret = new AuditBatchQueue(auditFileQueue); + } else { + ret = new AuditBatchQueue(consumer); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AuditProviderFactory.getAuditProvider()"); + } + + return ret; + } + private AuditHandler getDefaultProvider() { return new DummyAuditProvider(); } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java new file mode 100644 index 0000000..43e138d --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java @@ -0,0 +1,129 @@ +package org.apache.ranger.audit.provider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.utils.Writer; + +import java.util.Map; +import java.util.Properties; + +public class AuditWriterFactory { + private static final Log logger = LogFactory + .getLog(AuditWriterFactory.class); + //public static final String AUDIT_FILETYPE = "xasecure.audit.destination.filetype"; + public static final String AUDIT_FILETYPE_DEFAULT = "json"; + // public static final String AUDIT_FILEWRITER_IMPL = "xasecure.audit.destination.filewriter.impl"; + public static final String AUDIT_JSON_FILEWRITER_IMPL = "org.apache.ranger.audit.utils.JSONWriter"; + public static final String AUDIT_ORC_FILEWRITER_IMPL = "org.apache.ranger.audit.utils.ORCWriter"; + + + public Map auditConfigs = null; + public Properties props = null; + public String propPrefix = null; + public String auditProviderName = null; + public Writer writer = null; + private static volatile AuditWriterFactory me = null; + + + public static AuditWriterFactory getInstance() { + AuditWriterFactory auditWriter = me; + if (auditWriter == null) { + synchronized (AuditWriterFactory.class) { + auditWriter = me; + if (auditWriter == null) { + me = auditWriter = new AuditWriterFactory(); + } + } + } + return auditWriter; + } + + public void init(Properties props, String propPrefix, String auditProviderName, Map auditConfigs) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> AuditWriterFactory.init()"); + } + this.props = props; + this.propPrefix = propPrefix; + this.auditProviderName = auditProviderName; + this.auditConfigs = auditConfigs; + + String AUDIT_FILETYPE = propPrefix + ".filetype"; + String AUDIT_FILEWRITER_IMPL = propPrefix + ".filewriter.impl" ; + String auditFileType = MiscUtil.getStringProperty(props, AUDIT_FILETYPE, AUDIT_FILETYPE_DEFAULT); + String writerClass = MiscUtil.getStringProperty(props, AUDIT_FILEWRITER_IMPL); + + if (!StringUtils.isEmpty(writerClass)) { + writer = createWriter(writerClass); + } else { + writer = createWriter(getDefaultWriter(auditFileType)); + } + if ( writer != null) { + writer.init(props, propPrefix, auditProviderName, auditConfigs); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AuditWriterFactory.init() :" + writer.getClass().getName()); + } + } + + public Writer createWriter(String writerClass) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> AuditWriterFactory.createWriter()"); + } + Writer ret = null; + try { + Class cls = (Class) Class.forName(writerClass); + ret = cls.newInstance(); + } catch (Exception e) { + throw e; + } + if (logger.isDebugEnabled()) { + logger.debug("<== AuditWriterFactory.createWriter()"); + } + return ret; + } + + public String getDefaultWriter(String auditFileType) { + if (logger.isDebugEnabled()) { + logger.debug("==> AuditWriterFactory.getDefaultWriter()"); + } + String ret = null; + switch (auditFileType) { + case "orc": + ret = AUDIT_ORC_FILEWRITER_IMPL; + break; + case "json": + ret = AUDIT_JSON_FILEWRITER_IMPL; + break; + } + if (logger.isDebugEnabled()) { + logger.debug("<== AuditWriterFactory.getDefaultWriter() :" + ret); + } + return ret; + } + + public Writer getWriter(){ + return this.writer; + } +} + diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java index b095000..bd27fc2 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java @@ -25,6 +25,7 @@ import org.apache.ranger.audit.model.AuthzAuditEvent; import com.google.gson.GsonBuilder; +import java.io.File; import java.util.*; import java.util.concurrent.atomic.AtomicLong; @@ -170,6 +171,16 @@ public abstract class BaseAuditHandler implements AuditHandler { return log(eventList); } + @Override + public String getAuditFileType() { + return null; + } + + public boolean logFile(File file) { + return logFile(file); + } + + public String getParentPath() { return parentPath; } @@ -459,5 +470,4 @@ public abstract class BaseAuditHandler implements AuditHandler { logFailedEventJSON(event, excp); } } - } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java index 05f882f..ba7faf8 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java @@ -17,6 +17,7 @@ */ package org.apache.ranger.audit.provider; +import java.io.File; import java.util.Collection; import java.util.Properties; @@ -103,4 +104,21 @@ public class DummyAuditProvider implements AuditHandler { return this.getClass().getName(); } + /* (non-Javadoc) + * @see org.apache.ranger.audit.provider.AuditProvider#getAuditFileType() + */ + @Override + public boolean logFile(File file) { + return logFile(file); + } + + /* (non-Javadoc) + * @see org.apache.ranger.audit.provider.AuditProvider#getAuditFileType() + */ + @Override + public String getAuditFileType() { + String defaultAuditFileType ="json"; + return defaultAuditFileType; + } + } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java index eff3824..5f0102b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java @@ -327,6 +327,20 @@ public class MiscUtil { return ret; } + public static String getStringProperty(Properties props, String propName, + String defValue) { + String ret = defValue; + + if (props != null && propName != null) { + String val = props.getProperty(propName); + if (val != null) { + ret = val; + } + } + + return ret; + } + public static boolean getBooleanProperty(Properties props, String propName, boolean defValue) { boolean ret = defValue; diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpool.java index 41513ba..35bda56 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpool.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpool.java @@ -56,10 +56,10 @@ public class AuditFileCacheProviderSpool implements Runnable { public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec"; public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename"; public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; + public static final String PROP_FILE_SPOOL_BATCH_SIZE = "filespool.buffer.size"; public static final String AUDIT_IS_FILE_CACHE_PROVIDER_ENABLE_PROP = "xasecure.audit.provider.filecache.is.enabled"; public static final String FILE_CACHE_PROVIDER_NAME = "AuditFileCacheProviderSpool"; - public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000; AuditHandler consumerProvider = null; @@ -84,6 +84,7 @@ public class AuditFileCacheProviderSpool implements Runnable { boolean closeFile = false; boolean isPending = false; long lastAttemptTime = 0; + long bufferSize = 1000; boolean initDone = false; PrintWriter logWriter = null; @@ -275,6 +276,10 @@ public class AuditFileCacheProviderSpool implements Runnable { + FILE_CACHE_PROVIDER_NAME, t); return false; } + + bufferSize = MiscUtil.getLongProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_BATCH_SIZE, bufferSize); + initDone = true; logger.debug("<== AuditFileCacheProviderSpool.init()"); @@ -824,7 +829,7 @@ public class AuditFileCacheProviderSpool implements Runnable { AuditEventBase event = MiscUtil.fromJson(line, AuthzAuditEvent.class); events.add(event); - if (events.size() == AUDIT_BATCH_SIZE_DEFAULT) { + if (events.size() == bufferSize) { boolean ret = sendEvent(events, currentConsumerIndexRecord, currLine); if (!ret) { diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueue.java new file mode 100644 index 0000000..ae36e85 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueue.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.apache.ranger.audit.provider.BaseAuditHandler; + +import java.util.Collection; +import java.util.Properties; + +/* + AuditFileCacheProvider class does the work of stashing the audit logs into Local Filesystem before sending it to the AuditBatchQueue Consumer +*/ + +public class AuditFileQueue extends BaseAuditHandler { + private static final Log logger = LogFactory.getLog(AuditFileQueue.class); + + AuditFileQueueSpool fileSpooler = null; + AuditHandler consumer = null; + + static final String DEFAULT_NAME = "batch"; + + public AuditFileQueue(AuditHandler consumer) { + this.consumer = consumer; + } + + public void init(Properties prop, String basePropertyName) { + String propPrefix = "xasecure.audit.batch"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + super.init(prop, propPrefix); + + //init AuditFileQueueSpooler thread to send Local logs to destination + fileSpooler = new AuditFileQueueSpool(consumer); + fileSpooler.init(prop,propPrefix); + } + + @Override + public boolean log(AuditEventBase event) { + boolean ret = false; + if ( event != null) { + fileSpooler.stashLogs(event); + if (fileSpooler.isSpoolingSuccessful()) { + ret = true; + } + } + return ret; + } + + @Override + public boolean log(Collection events) { + boolean ret = true; + if ( events != null) { + for (AuditEventBase event : events) { + ret = log(event); + } + } + return ret; + } + + @Override + public void start() { + // Start the consumer thread + if (consumer != null) { + consumer.start(); + } + if (fileSpooler != null) { + // start AuditFileSpool thread + fileSpooler.start(); + } + } + + @Override + public void stop() { + logger.info("Stop called. name=" + getName()); + if (consumer != null) { + consumer.stop(); + } + } + + @Override + public void waitToComplete() { + logger.info("waitToComplete called. name=" + getName()); + if ( consumer != null) { + consumer.waitToComplete(); + } + } + + @Override + public void waitToComplete(long timeout) { + logger.info("waitToComplete called. name=" + getName()); + if ( consumer != null) { + consumer.waitToComplete(timeout); + } + } + + @Override + public void flush() { + logger.info("waitToComplete. name=" + getName()); + if ( consumer != null) { + consumer.flush(); + } + } + + } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueueSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueueSpool.java new file mode 100644 index 0000000..5eed4a6 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileQueueSpool.java @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.MDC; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.AuditHandler; +import org.apache.ranger.audit.provider.MiscUtil; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * This class temporarily stores logs in Local file system before it despatches each logs in file to the AuditBatchQueue Consumer. + * This gets instantiated only when AuditFileCacheProvider is enabled (xasecure.audit.provider.filecache.is.enabled). + * When AuditFileCacheProvider is all the logs are stored in local file system before sent to destination. + */ + +public class AuditFileQueueSpool implements Runnable { + private static final Log logger = LogFactory.getLog(AuditFileQueueSpool.class); + + public enum SPOOL_FILE_STATUS { + pending, write_inprogress, read_inprogress, done + } + + public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir"; + public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format"; + public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir"; + public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files"; + public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix"; + public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec"; + public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename"; + public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; + public static final String PROP_FILE_SPOOL_BATCH_SIZE = "filespool.buffer.size"; + + public static final String FILE_QUEUE_PROVIDER_NAME = "AuditFileQueueSpool"; + public static final String DEFAULT_AUDIT_FILE_TYPE = "json"; + + AuditHandler consumerProvider = null; + + BlockingQueue indexQueue = new LinkedBlockingQueue(); + List indexRecords = new ArrayList(); + + // Folder and File attributes + File logFolder = null; + String logFileNameFormat = null; + File archiveFolder = null; + String fileNamePrefix = null; + String indexFileName = null; + File indexFile = null; + String indexDoneFileName = null; + String auditFileType = null; + File indexDoneFile = null; + int retryDestinationMS = 30 * 1000; // Default 30 seconds + int fileRolloverSec = 24 * 60 * 60; // In seconds + int maxArchiveFiles = 100; + int errorLogIntervalMS = 30 * 1000; // Every 30 seconds + long lastErrorLogMS = 0; + boolean isAuditFileCacheProviderEnabled = false; + boolean closeFile = false; + boolean isPending = false; + long lastAttemptTime = 0; + long bufferSize = 1000; + boolean initDone = false; + + PrintWriter logWriter = null; + AuditIndexRecord currentWriterIndexRecord = null; + AuditIndexRecord currentConsumerIndexRecord = null; + + BufferedReader logReader = null; + Thread destinationThread = null; + + boolean isWriting = true; + boolean isDrain = false; + boolean isDestDown = false; + boolean isSpoolingSuccessful = true; + + private Gson gson = null; + + public AuditFileQueueSpool(AuditHandler consumerProvider) { + this.consumerProvider = consumerProvider; + } + + public void init(Properties prop) { + init(prop, null); + } + + public boolean init(Properties props, String basePropertyName) { + logger.debug("==> AuditFileQueueSpool.init()"); + + if (initDone) { + logger.error("init() called more than once. queueProvider=" + + "" + ", consumerProvider=" + + consumerProvider.getName()); + return true; + } + String propPrefix = "xasecure.audit.filespool"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + + try { + gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + .create(); + // Initial folder and file properties + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_LOCAL_DIR); + logFileNameFormat = MiscUtil.getStringProperty(props, + basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME); + String archiveFolderProp = MiscUtil.getStringProperty(props, + propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR); + fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILENAME_PREFIX); + indexFileName = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_INDEX_FILE); + retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec); + maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles); + logger.info("retryDestinationMS=" + retryDestinationMS + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME); + logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.fatal("Audit spool folder is not configured. Please set " + + propPrefix + + "." + + PROP_FILE_SPOOL_LOCAL_DIR + + ". queueName=" + FILE_QUEUE_PROVIDER_NAME); + return false; + } + logFolder = new File(logFolderProp); + if (!logFolder.isDirectory()) { + boolean result = logFolder.mkdirs(); + if (!logFolder.isDirectory() || !result) { + logger.fatal("File Spool folder not found and can't be created. folder=" + + logFolder.getAbsolutePath() + + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return false; + } + } + logger.info("logFolder=" + logFolder + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "spool_" + "%app-type%" + "_" + + "%time:yyyyMMdd-HHmm.ss%.log"; + } + logger.info("logFileNameFormat=" + logFileNameFormat + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME); + + if (archiveFolderProp == null || archiveFolderProp.isEmpty()) { + archiveFolder = new File(logFolder, "archive"); + } else { + archiveFolder = new File(archiveFolderProp); + } + if (!archiveFolder.isDirectory()) { + boolean result = archiveFolder.mkdirs(); + if (!archiveFolder.isDirectory() || !result) { + logger.error("File Spool archive folder not found and can't be created. folder=" + + archiveFolder.getAbsolutePath() + + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return false; + } + } + logger.info("archiveFolder=" + archiveFolder + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + + if (indexFileName == null || indexFileName.isEmpty()) { + if (fileNamePrefix == null || fileNamePrefix.isEmpty()) { + fileNamePrefix = FILE_QUEUE_PROVIDER_NAME + "_" + + consumerProvider.getName(); + } + indexFileName = "index_" + fileNamePrefix + "_" + "%app-type%" + + ".json"; + indexFileName = MiscUtil.replaceTokens(indexFileName, + System.currentTimeMillis()); + } + + indexFile = new File(logFolder, indexFileName); + if (!indexFile.exists()) { + boolean ret = indexFile.createNewFile(); + if (!ret) { + logger.fatal("Error creating index file. fileName=" + + indexFile.getPath()); + return false; + } + } + logger.info("indexFile=" + indexFile + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + + int lastDot = indexFileName.lastIndexOf('.'); + if (lastDot < 0) { + lastDot = indexFileName.length() - 1; + } + indexDoneFileName = indexFileName.substring(0, lastDot) + + "_closed.json"; + indexDoneFile = new File(logFolder, indexDoneFileName); + if (!indexDoneFile.exists()) { + boolean ret = indexDoneFile.createNewFile(); + if (!ret) { + logger.fatal("Error creating index done file. fileName=" + + indexDoneFile.getPath()); + return false; + } + } + logger.info("indexDoneFile=" + indexDoneFile + ", queueName=" + + FILE_QUEUE_PROVIDER_NAME); + + // Load index file + loadIndexFile(); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) { + isPending = true; + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.write_inprogress)) { + currentWriterIndexRecord = auditIndexRecord; + logger.info("currentWriterIndexRecord=" + + currentWriterIndexRecord.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME); + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.read_inprogress)) { + indexQueue.add(auditIndexRecord); + } + } + printIndex(); + for (int i = 0; i < indexRecords.size(); i++) { + AuditIndexRecord auditIndexRecord = indexRecords.get(i); + if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) { + File consumerFile = new File(auditIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("INIT: Consumer file=" + + consumerFile.getPath() + " not found."); + } else { + indexQueue.add(auditIndexRecord); + } + } + } + + auditFileType = consumerProvider.getAuditFileType(); + if (auditFileType == null) { + auditFileType = DEFAULT_AUDIT_FILE_TYPE; + } + + } catch (Throwable t) { + logger.fatal("Error initializing File Spooler. queue=" + + FILE_QUEUE_PROVIDER_NAME, t); + return false; + } + + bufferSize = MiscUtil.getLongProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_BATCH_SIZE, bufferSize); + + initDone = true; + + logger.debug("<== AuditFileQueueSpool.init()"); + return true; + } + + /** + * Start looking for outstanding logs and update status according. + */ + public void start() { + if (!initDone) { + logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return; + } + + logger.info("Starting writerThread, queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()); + + // Let's start the thread to read + destinationThread = new Thread(this, FILE_QUEUE_PROVIDER_NAME + "_" + + consumerProvider.getName() + "_destWriter"); + destinationThread.setDaemon(true); + destinationThread.start(); + } + + public void stop() { + if (!initDone) { + logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return; + } + logger.info("Stop called, queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + + isDrain = true; + flush(); + + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + // If write is still going on, then let's give it enough time to + // complete + for (int i = 0; i < 3; i++) { + if (isWriting) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + continue; + } + try { + logger.info("Closing open file, queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()); + + out.flush(); + out.close(); + break; + } catch (Throwable t) { + logger.debug("Error closing spool out file.", t); + } + } + } + try { + if (destinationThread != null) { + destinationThread.interrupt(); + } + destinationThread = null; + } catch (Throwable e) { + // ignore + } + } + + public void flush() { + if (!initDone) { + logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return; + } + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + out.flush(); + } + } + + /** + * If any files are still not processed. Also, if the destination is not + * reachable + * + * @return + */ + public boolean isPending() { + if (!initDone) { + logError("isPending(): File Spooler not initialized. queueName=" + + FILE_QUEUE_PROVIDER_NAME); + return false; + } + + return isPending; + } + + /** + * Milliseconds from last attempt time + * + * @return + */ + public long getLastAttemptTimeDelta() { + if (lastAttemptTime == 0) { + return 0; + } + return System.currentTimeMillis() - lastAttemptTime; + } + + synchronized public void stashLogs(AuditEventBase event) { + + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + // Convert event to json + String jsonStr = MiscUtil.stringify(event); + logOut.println(jsonStr); + logOut.flush(); + isPending = true; + isSpoolingSuccessful = true; + } catch (Throwable t) { + isSpoolingSuccessful = false; + logger.error("Error writing to file. event=" + event, t); + } finally { + isWriting = false; + } + + } + + synchronized public void stashLogs(Collection events) { + for (AuditEventBase event : events) { + stashLogs(event); + } + flush(); + } + + synchronized public void stashLogsString(String event) { + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + logOut.println(event); + } catch (Exception ex) { + logger.error("Error writing to file. event=" + event, ex); + } finally { + isWriting = false; + } + + } + + synchronized public boolean isSpoolingSuccessful() { + return isSpoolingSuccessful; + } + + synchronized public void stashLogsString(Collection events) { + for (String event : events) { + stashLogsString(event); + } + flush(); + } + + /** + * This return the current file. If there are not current open output file, + * then it will return null + * + * @return + * @throws Exception + */ + synchronized private PrintWriter getOpenLogFileStream() { + return logWriter; + } + + /** + * @return + * @throws Exception + */ + synchronized private PrintWriter getLogFileStream() throws Exception { + closeFileIfNeeded(); + // Either there are no open log file or the previous one has been rolled + // over + if (currentWriterIndexRecord == null) { + Date currentTime = new Date(); + // Create a new file + String fileName = MiscUtil.replaceTokens(logFileNameFormat, + currentTime.getTime()); + String newFileName = fileName; + File outLogFile = null; + int i = 0; + while (true) { + outLogFile = new File(logFolder, newFileName); + File archiveLogFile = new File(archiveFolder, newFileName); + if (!outLogFile.exists() && !archiveLogFile.exists()) { + break; + } + i++; + int lastDot = fileName.lastIndexOf('.'); + String baseName = fileName.substring(0, lastDot); + String extension = fileName.substring(lastDot); + newFileName = baseName + "." + i + extension; + } + fileName = newFileName; + logger.info("Creating new file. queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", fileName=" + fileName); + // Open the file + logWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + outLogFile),"UTF-8"))); + + AuditIndexRecord tmpIndexRecord = new AuditIndexRecord(); + + tmpIndexRecord.id = MiscUtil.generateUniqueId(); + tmpIndexRecord.filePath = outLogFile.getPath(); + tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress; + tmpIndexRecord.fileCreateTime = currentTime; + tmpIndexRecord.lastAttempt = true; + currentWriterIndexRecord = tmpIndexRecord; + indexRecords.add(currentWriterIndexRecord); + saveIndexFile(); + + } else { + if (logWriter == null) { + // This means the process just started. We need to open the file + // in append mode. + logger.info("Opening existing file for append. queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", fileName=" + + currentWriterIndexRecord.filePath); + logWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + currentWriterIndexRecord.filePath, true),"UTF-8"))); + } + } + return logWriter; + } + + synchronized private void closeFileIfNeeded() throws FileNotFoundException, + IOException { + // Is there file open to write or there are no pending file, then close + // the active file + if (currentWriterIndexRecord != null) { + // Check whether the file needs to rolled + rollOverSpoolFileByTime(); + + if (closeFile) { + // Roll the file + if (logWriter != null) { + logWriter.flush(); + logWriter.close(); + logWriter = null; + closeFile = false; + } + currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending; + currentWriterIndexRecord.writeCompleteTime = new Date(); + saveIndexFile(); + logger.info("Adding file to queue. queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", fileName=" + + currentWriterIndexRecord.filePath); + indexQueue.add(currentWriterIndexRecord); + currentWriterIndexRecord = null; + } + } + } + + private void rollOverSpoolFileByTime() { + if (System.currentTimeMillis() + - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) { + closeFile = true; + logger.info("Closing file. Rolling over. queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", fileName=" + + currentWriterIndexRecord.filePath); + } + } + + /** + * Load the index file + * + * @throws IOException + */ + void loadIndexFile() throws IOException { + logger.info("Loading index file. fileName=" + indexFile.getPath()); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader(new FileInputStream(indexFile), "UTF-8")); + indexRecords.clear(); + String line; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + indexRecords.add(record); + } + } + } finally { + if (br!= null) { + br.close(); + } + } + } + + synchronized void printIndex() { + logger.info("INDEX printIndex() ==== START"); + Iterator iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + logger.info("INDEX=" + record + ", isFileExist=" + + (new File(record.filePath).exists())); + } + logger.info("INDEX printIndex() ==== END"); + } + + synchronized void removeIndexRecord(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + Iterator iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + if (record.id.equals(indexRecord.id)) { + logger.info("Removing file from index. file=" + record.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + + iter.remove(); + appendToDoneFile(record); + } + } + saveIndexFile(); + // If there are no more files in the index, then let's assume the + // destination is now available + if (indexRecords.size() == 0) { + isPending = false; + } + } + + synchronized void saveIndexFile() throws FileNotFoundException, IOException { + PrintWriter out = new PrintWriter(indexFile,"UTF-8"); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + out.println(gson.toJson(auditIndexRecord)); + } + out.close(); + // printIndex(); + + } + + void appendToDoneFile(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + logger.info("Moving to done file. " + indexRecord.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()); + String line = gson.toJson(indexRecord); + PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + indexDoneFile, true),"UTF-8"))); + out.println(line); + out.flush(); + out.close(); + + // After Each file is read and audit events are pushed into pipe, we flush to reach the destination immediate. + consumerProvider.flush(); + + // Move to archive folder + File logFile = null; + File archiveFile = null; + try { + logFile = new File(indexRecord.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + logger.info("Moving logFile " + logFile + " to " + archiveFile); + boolean result = logFile.renameTo(archiveFile); + if (!result) { + logger.error("Error moving log file to archive folder. Unable to rename" + + logFile + " to archiveFile=" + archiveFile); + } + } catch (Throwable t) { + logger.error("Error moving log file to archive folder. logFile=" + + logFile + ", archiveFile=" + archiveFile, t); + } + + // After archiving the file flush the pipe + consumerProvider.flush(); + + archiveFile = null; + try { + // Remove old files + File[] logFiles = archiveFolder.listFiles(new FileFilter() { + public boolean accept(File pathname) { + return pathname.getName().toLowerCase().endsWith(".log"); + } + }); + + if (logFiles != null && logFiles.length > maxArchiveFiles) { + int filesToDelete = logFiles.length - maxArchiveFiles; + BufferedReader br = new BufferedReader(new FileReader( + indexDoneFile)); + try { + int filesDeletedCount = 0; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + logFile = new File(record.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + if (archiveFile.exists()) { + logger.info("Deleting archive file " + + archiveFile); + boolean ret = archiveFile.delete(); + if (!ret) { + logger.error("Error deleting archive file. archiveFile=" + + archiveFile); + } + filesDeletedCount++; + if (filesDeletedCount >= filesToDelete) { + logger.info("Deleted " + filesDeletedCount + + " files"); + break; + } + } + } + } + } finally { + br.close(); + } + } + } catch (Throwable t) { + logger.error("Error deleting older archive file. archiveFile=" + + archiveFile, t); + } + + } + + void logError(String msg) { + long currTimeMS = System.currentTimeMillis(); + if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { + logger.error(msg); + lastErrorLogMS = currTimeMS; + } + } + + class AuditIndexRecord { + String id; + String filePath; + int linePosition = 0; + SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress; + Date fileCreateTime; + Date writeCompleteTime; + Date doneCompleteTime; + Date lastSuccessTime; + Date lastFailedTime; + int failedAttemptCount = 0; + boolean lastAttempt = false; + + @Override + public String toString() { + return "AuditIndexRecord [id=" + id + ", filePath=" + filePath + + ", linePosition=" + linePosition + ", status=" + status + + ", fileCreateTime=" + fileCreateTime + + ", writeCompleteTime=" + writeCompleteTime + + ", doneCompleteTime=" + doneCompleteTime + + ", lastSuccessTime=" + lastSuccessTime + + ", lastFailedTime=" + lastFailedTime + + ", failedAttemptCount=" + failedAttemptCount + + ", lastAttempt=" + lastAttempt + "]"; + } + + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + try { + //This is done to clear the MDC context to avoid issue with Ranger Auditing for Knox + MDC.clear(); + runLogAudit(); + } catch (Throwable t) { + logger.fatal("Exited thread without abnormaly. queue=" + + consumerProvider.getName(), t); + } + } + + public void runLogAudit() { + // boolean isResumed = false; + while (true) { + try { + if (isDestDown) { + logger.info("Destination is down. sleeping for " + + retryDestinationMS + + " milli seconds. indexQueue=" + indexQueue.size() + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + Thread.sleep(retryDestinationMS); + } + // Let's pause between each iteration + if (currentConsumerIndexRecord == null) { + currentConsumerIndexRecord = indexQueue.poll( + retryDestinationMS, TimeUnit.MILLISECONDS); + } else { + Thread.sleep(retryDestinationMS); + } + + if (isDrain) { + // Need to exit + break; + } + if (currentConsumerIndexRecord == null) { + closeFileIfNeeded(); + continue; + } + + boolean isRemoveIndex = false; + File consumerFile = new File( + currentConsumerIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("Consumer file=" + consumerFile.getPath() + + " not found."); + printIndex(); + isRemoveIndex = true; + } else { + // Let's open the file to write + BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream( + currentConsumerIndexRecord.filePath),"UTF-8")); + try { + if (auditFileType.equalsIgnoreCase(DEFAULT_AUDIT_FILE_TYPE)) { + // if Audit File format is JSON each audit file in the Local Spool Location will be copied + // to HDFS location as JSON + File srcFile = new File(currentConsumerIndexRecord.filePath); + logFile(srcFile); + } else { + // If Audit File format is ORC, each records in audit files in the Local Spool Location will be + // read and converted into ORC format and pushed into an ORC file. + logEvent(br); + } + logger.info("Done reading file. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + // The entire file is read + currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done; + currentConsumerIndexRecord.doneCompleteTime = new Date(); + currentConsumerIndexRecord.lastAttempt = true; + + isRemoveIndex = true; + } catch (Exception ex) { + isDestDown = true; + logError("Destination down. queueName=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()); + lastAttemptTime = System.currentTimeMillis(); + // Update the index file + currentConsumerIndexRecord.lastFailedTime = new Date(); + currentConsumerIndexRecord.failedAttemptCount++; + currentConsumerIndexRecord.lastAttempt = false; + saveIndexFile(); + } finally { + br.close(); + } + } + if (isRemoveIndex) { + // Remove this entry from index + removeIndexRecord(currentConsumerIndexRecord); + currentConsumerIndexRecord = null; + closeFileIfNeeded(); + } + } catch (InterruptedException e) { + logger.info("Caught exception in consumer thread. Shutdown might be in progress"); + } catch (Throwable t) { + logger.error("Exception in destination writing thread.", t); + } + } + logger.info("Exiting file spooler. provider=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + } + + private void logEvent(BufferedReader br) throws Exception { + String line; + int currLine = 0; + int startLine = currentConsumerIndexRecord.linePosition; + List events = new ArrayList<>(); + while ((line = br.readLine()) != null) { + currLine++; + if (currLine < startLine) { + continue; + } + AuditEventBase event = MiscUtil.fromJson(line, AuthzAuditEvent.class); + events.add(event); + + if (events.size() == bufferSize) { + boolean ret = sendEvent(events, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } + events.clear(); + } + } + if (events.size() > 0) { + boolean ret = sendEvent(events, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } + events.clear(); + } + } + + private boolean sendEvent(List events, AuditIndexRecord indexRecord, + int currLine) { + boolean ret = true; + try { + ret = consumerProvider.log(events); + if (!ret) { + // Need to log error after fixed interval + logError("Error sending logs to consumer. provider=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()); + } else { + // Update index and save + indexRecord.linePosition = currLine; + indexRecord.status = SPOOL_FILE_STATUS.read_inprogress; + indexRecord.lastSuccessTime = new Date(); + indexRecord.lastAttempt = true; + saveIndexFile(); + + if (isDestDown) { + isDestDown = false; + logger.info("Destination up now. " + indexRecord.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + } + } + } catch (Throwable t) { + logger.error("Error while sending logs to consumer. provider=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName() + ", log=" + events, t); + } + + return ret; + } + + private void logFile(File file) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> AuditFileQueueSpool.logFile()"); + } + int currLine = 0; + int startLine = currentConsumerIndexRecord.linePosition; + + if (currLine < startLine) { + currLine++; + } + + boolean ret = sendFile(file,currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AuditFileQueueSpool.logFile()"); + } + } + + private boolean sendFile(File file, AuditIndexRecord indexRecord, + int currLine) { + boolean ret = true; + if (logger.isDebugEnabled()) { + logger.debug("==> AuditFileQueueSpool.sendFile()"); + } + + try { + ret = consumerProvider.logFile(file); + if (!ret) { + // Need to log error after fixed interval + logError("Error sending log file to consumer. provider=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName()+ ", logFile=" + file.getName()); + } else { + // Update index and save + indexRecord.linePosition = currLine; + indexRecord.status = SPOOL_FILE_STATUS.read_inprogress; + indexRecord.lastSuccessTime = new Date(); + indexRecord.lastAttempt = true; + saveIndexFile(); + + if (isDestDown) { + isDestDown = false; + logger.info("Destination up now. " + indexRecord.filePath + + ", queueName=" + FILE_QUEUE_PROVIDER_NAME + + ", consumer=" + consumerProvider.getName()); + } + } + } catch (Throwable t) { + logger.error("Error sending log file to consumer. provider=" + + FILE_QUEUE_PROVIDER_NAME + ", consumer=" + + consumerProvider.getName() + ", logFile=" + file.getName(), t); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AuditFileQueueSpool.sendFile() " + ret ); + } + return ret; + } +} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractAuditWriter.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractAuditWriter.java new file mode 100644 index 0000000..9932c4e --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractAuditWriter.java @@ -0,0 +1,352 @@ +package org.apache.ranger.audit.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.ranger.audit.provider.MiscUtil; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.Date; +import java.util.Map; +import java.util.Properties; + +/** + * This is Abstract class to have common properties of Ranger Audit HDFS Destination Writer. + */ +public abstract class AbstractAuditWriter implements Writer { + private static final Log logger = LogFactory + .getLog(AbstractAuditWriter.class); + + public static final String PROP_FILESYSTEM_DIR = "dir"; + public static final String PROP_FILESYSTEM_SUBDIR = "subdir"; + public static final String PROP_FILESYSTEM_FILE_NAME_FORMAT = "filename.format"; + public static final String PROP_FILESYSTEM_FILE_ROLLOVER = "file.rollover.sec"; + public static final String PROP_FILESYSTEM_ROLLOVER_PERIOD = "file.rollover.period"; + public static final String PROP_FILESYSTEM_FILE_EXTENSION = ".log"; + public Configuration conf = null; + public FileSystem fileSystem = null; + public Map auditConfigs = null; + public Path auditPath = null; + public PrintWriter logWriter = null; + public RollingTimeUtil rollingTimeUtil = null; + public String auditProviderName = null; + public String fullPath = null; + public String parentFolder = null; + public String currentFileName = null; + public String logFileNameFormat = null; + public String logFolder = null; + public String fileExtension = null; + public String rolloverPeriod = null; + public String fileSystemScheme = null; + public Date nextRollOverTime = null; + public int fileRolloverSec = 24 * 60 * 60; // In seconds + public boolean rollOverByDuration = false; + public volatile FSDataOutputStream ostream = null; // output stream wrapped in logWriter + + + @Override + public void init(Properties props, String propPrefix, String auditProviderName, Map auditConfigs) { + // Initialize properties for this class + // Initial folder and file properties + logger.info("==> AbstractAuditWriter.init()"); + this.auditProviderName = auditProviderName; + this.auditConfigs = auditConfigs; + + initAuditWriterBase(props,propPrefix); + + logger.info("<== AbstractAuditWriter.init()"); + } + + public void createFileSystemFolders() throws Exception { + + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.createFileSystemFolders()"); + } + // Create a new file + Date currentTime = new Date(); + String fileName = MiscUtil.replaceTokens(logFileNameFormat, currentTime.getTime()); + parentFolder = MiscUtil.replaceTokens(logFolder, currentTime.getTime()); + fullPath = parentFolder + Path.SEPARATOR + fileName; + String defaultPath = fullPath; + + conf = createConfiguration(); + URI uri = URI.create(fullPath); + fileSystem = FileSystem.get(uri, conf); + auditPath = new Path(fullPath); + fileSystemScheme = getFileSystemScheme(); + logger.info("Checking whether log file exists. "+ fileSystemScheme + "Path= " + fullPath + ", UGI=" + MiscUtil.getUGILoginUser()); + int i = 0; + while (fileSystem.exists(auditPath)) { + i++; + int lastDot = defaultPath.lastIndexOf('.'); + String baseName = defaultPath.substring(0, lastDot); + String extension = defaultPath.substring(lastDot); + fullPath = baseName + "." + i + extension; + auditPath = new Path(fullPath); + logger.info("Checking whether log file exists. "+ fileSystemScheme + "Path= " + fullPath); + } + logger.info("Log file doesn't exists. Will create and use it. "+ fileSystemScheme + "Path= " + fullPath); + + // Create parent folders + createParents(auditPath, fileSystem); + + currentFileName = fullPath; + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.createFileSystemFolders()"); + } + } + + public Configuration createConfiguration() { + Configuration conf = new Configuration(); + for (Map.Entry entry : auditConfigs.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + // for ease of install config file may contain properties with empty value, skip those + if (StringUtils.isNotEmpty(value)) { + conf.set(key, value); + } + logger.info("Adding property to "+ fileSystemScheme + " + config: " + key + " => " + value); + } + + logger.info("Returning " + fileSystemScheme + "Filesystem Config: " + conf.toString()); + return conf; + } + + public void createParents(Path pathLogfile, FileSystem fileSystem) + throws Exception { + logger.info("Creating parent folder for " + pathLogfile); + Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; + + if (parentPath != null && fileSystem != null + && !fileSystem.exists(parentPath)) { + fileSystem.mkdirs(parentPath); + } + } + + public void initAuditWriterBase(Properties props, String propPrefix) { + + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.initAuditWriter()"); + } + + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_DIR); + if (StringUtils.isEmpty(logFolderProp)) { + logger.fatal("File destination folder is not configured. Please set " + + propPrefix + "." + + PROP_FILESYSTEM_DIR + ". name=" + + auditProviderName); + return; + } + + String logSubFolder = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_SUBDIR); + if (StringUtils.isEmpty(logSubFolder)) { + logSubFolder = "%app-type%/%time:yyyyMMdd%"; + } + + logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_FILE_NAME_FORMAT); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + PROP_FILESYSTEM_FILE_ROLLOVER, fileRolloverSec); + + if (StringUtils.isEmpty(fileExtension)) { + setFileExtension(PROP_FILESYSTEM_FILE_EXTENSION); + } + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + fileExtension; + } + + logFolder = logFolderProp + "/" + logSubFolder; + + logger.info("logFolder=" + logFolder + ", destName=" + auditProviderName); + logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="+ auditProviderName); + logger.info("config=" + auditConfigs.toString()); + + rolloverPeriod = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_ROLLOVER_PERIOD); + rollingTimeUtil = RollingTimeUtil.getInstance(); + + //file.rollover.period is used for rolling over. If it could compute the next roll over time using file.rollover.period + //it fall back to use file.rollover.sec for find next rollover time. If still couldn't find default will be 1day window + //for rollover. + if(StringUtils.isEmpty(rolloverPeriod) ) { + rolloverPeriod = rollingTimeUtil.convertRolloverSecondsToRolloverPeriod(fileRolloverSec); + } + + try { + nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod); + } catch ( Exception e) { + logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for "+ fileSystemScheme + " audit file rollover...", e); + rollOverByDuration = true; + nextRollOverTime = rollOverByDuration(); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.initAuditWriter()"); + } + + } + + public void closeFileIfNeeded() throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.closeFileIfNeeded()"); + } + + if (logWriter == null) { + return; + } + + if ( System.currentTimeMillis() > nextRollOverTime.getTime() ) { + logger.info("Closing file. Rolling over. name=" + auditProviderName + + ", fileName=" + currentFileName); + try { + logWriter.flush(); + logWriter.close(); + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + auditProviderName + ", fileName=" + currentFileName); + } + + logWriter = null; + ostream = null; + currentFileName = null; + + if (!rollOverByDuration) { + try { + if(StringUtils.isEmpty(rolloverPeriod) ) { + rolloverPeriod = rollingTimeUtil.convertRolloverSecondsToRolloverPeriod(fileRolloverSec); + } + nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod); + } catch ( Exception e) { + logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for " + fileSystemScheme + " audit file rollover...", e); + nextRollOverTime = rollOverByDuration(); + } + } else { + nextRollOverTime = rollOverByDuration(); + } + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.closeFileIfNeeded()"); + } + } + + public Date rollOverByDuration() { + long rollOverTime = rollingTimeUtil.computeNextRollingTime(fileRolloverSec,nextRollOverTime); + return new Date(rollOverTime); + } + + public PrintWriter createWriter() throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.createWriter()"); + } + + if (logWriter == null) { + // Create the file to write + createFileSystemFolders(); + logger.info("Creating new log file. auditPath=" + fullPath); + ostream = fileSystem.create(auditPath); + logWriter = new PrintWriter(ostream); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.createWriter()"); + } + + return logWriter; + } + + public void closeWriter() { + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.closeWriter()"); + } + + logWriter = null; + ostream = null; + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.closeWriter()"); + } + } + + @Override + public void flush() { + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.flush()"); + } + if (ostream != null) { + try { + synchronized (this) { + if (ostream != null) + // 1) PrinterWriter does not have bufferring of its own so + // we need to flush its underlying stream + // 2) HDFS flush() does not really flush all the way to disk. + ostream.hflush(); + logger.info("Flush " + fileSystemScheme + " audit logs completed....."); + } + } catch (IOException e) { + logger.error("Error on flushing log writer: " + e.getMessage() + + "\nException will be ignored. name=" + auditProviderName + ", fileName=" + currentFileName); + } + } + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.flush()"); + } + } + + public boolean logFileToHDFS(File file) throws Exception { + boolean ret = false; + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractAuditWriter.logFileToHDFS()"); + } + + if (logWriter == null) { + // Create the file to write + createFileSystemFolders(); + logger.info("Copying the Audit File" + file.getName() + " to HDFS Path" + fullPath); + Path destPath = new Path(fullPath); + ret = FileUtil.copy(file,fileSystem,destPath,false,conf); + } + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractAuditWriter.logFileToHDFS()"); + } + return ret; + } + + public String getFileSystemScheme() { + String ret = null; + ret = logFolder.substring(0, (logFolder.indexOf(":"))); + ret = ret.toUpperCase(); + return ret; + } + + public void setFileExtension(String fileExtension) { + this.fileExtension = fileExtension; + } +} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/JSONWriter.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/JSONWriter.java new file mode 100644 index 0000000..54bb458 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/JSONWriter.java @@ -0,0 +1,173 @@ +package org.apache.ranger.audit.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.provider.MiscUtil; + +import java.io.File; +import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +/** + * Writes the Ranger audit to HDFS as JSON text + */ +public class JSONWriter extends AbstractAuditWriter { + + private static final Log logger = LogFactory.getLog(JSONWriter.class); + + protected String JSON_FILE_EXTENSION = ".json"; + + public void init(Properties props, String propPrefix, String auditProviderName, Map auditConfigs) { + if (logger.isDebugEnabled()) { + logger.debug("==> JSONWriter.init()"); + } + init(); + super.init(props,propPrefix,auditProviderName,auditConfigs); + if (logger.isDebugEnabled()) { + logger.debug("<== JSONWriter.init()"); + } + } + + public void init() { + setFileExtension(JSON_FILE_EXTENSION); + } + + synchronized public boolean logJSON(final Collection events) throws Exception { + boolean ret = false; + PrintWriter out = null; + try { + if (logger.isDebugEnabled()) { + logger.debug("UGI=" + MiscUtil.getUGILoginUser() + + ". Will write to HDFS file=" + currentFileName); + } + out = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction() { + @Override + public PrintWriter run() throws Exception { + PrintWriter out = getLogFileStream(); + for (String event : events) { + out.println(event); + } + return out; + }; + }); + // flush and check the stream for errors + if (out.checkError()) { + // In theory, this count may NOT be accurate as part of the messages may have been successfully written. + // However, in practice, since client does buffering, either all of none would succeed. + out.close(); + closeWriter(); + return ret; + } + } catch (Exception e) { + if (out != null) { + out.close(); + } + closeWriter(); + return ret; + } finally { + ret = true; + logger.info("Flushing HDFS audit. Event Size:" + events.size()); + if (out != null) { + out.flush(); + } + } + + return ret; + } + + @Override + public boolean log(Collection events) throws Exception { + return logJSON(events); + } + + synchronized public boolean logAsFile(final File file) throws Exception { + boolean ret = false; + if (logger.isDebugEnabled()) { + logger.debug("UGI=" + MiscUtil.getUGILoginUser() + + ". Will write to HDFS file=" + currentFileName); + } + Boolean retVal = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + boolean ret = logFileToHDFS(file); + return Boolean.valueOf(ret); + }; + }); + ret = retVal.booleanValue(); + logger.info("Flushing HDFS audit File :" + file.getAbsolutePath() + file.getName()); + return ret; + } + + @Override + public boolean logFile(File file) throws Exception { + return logAsFile(file); + } + + synchronized public PrintWriter getLogFileStream() throws Exception { + closeFileIfNeeded(); + // Either there are no open log file or the previous one has been rolled + // over + PrintWriter logWriter = createWriter(); + return logWriter; + } + + + public void flush() { + if (logger.isDebugEnabled()) { + logger.debug("==> JSONWriter.flush()"); + } + logger.info("Flush called. name=" + auditProviderName); + super.flush(); + if (logger.isDebugEnabled()) { + logger.debug("<== JSONWriter.flush()"); + } + } + + @Override + public void start() { + // nothing to start + } + + @Override + synchronized public void stop() { + if (logger.isDebugEnabled()) { + logger.debug("==> JSONWriter.stop()"); + } + if (logWriter != null) { + try { + logWriter.flush(); + logWriter.close(); + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + auditProviderName + ", fileName=" + currentFileName); + } + logWriter = null; + ostream = null; + } + if (logger.isDebugEnabled()) { + logger.debug("<== JSONWriter.stop()"); + } + } +} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java new file mode 100644 index 0000000..c2b1ea6 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java @@ -0,0 +1,454 @@ +package org.apache.ranger.audit.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.gson.annotations.SerializedName; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcFile.WriterOptions; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.model.EnumRepositoryType; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Map; +import java.util.HashMap; +import java.text.Format; +import java.text.SimpleDateFormat; + +public class ORCFileUtil { + + private static final Log logger = LogFactory.getLog(ORCFileUtil.class); + + private static volatile ORCFileUtil me = null; + protected CompressionKind defaultCompression = CompressionKind.SNAPPY; + protected CompressionKind compressionKind = CompressionKind.NONE; + protected TypeDescription schema = null; + protected VectorizedRowBatch batch = null; + protected String auditSchema = null; + protected String dateFormat = "yyyy-MM-dd HH:mm:ss"; + + protected ArrayList schemaFields = new ArrayList<>(); + protected Map vectorizedRowBatchMap = new HashMap<>(); + protected int orcBufferSize; + protected long orcStripeSize; + + public static ORCFileUtil getInstance() { + ORCFileUtil orcFileUtil = me; + if (orcFileUtil == null) { + synchronized (ORCFileUtil.class) { + orcFileUtil = me; + if (orcFileUtil == null) { + me = orcFileUtil = new ORCFileUtil(); + } + } + } + return orcFileUtil; + } + + public void init(int orcBufferSize, long orcStripeSize, String compression) throws Exception{ + if (logger.isDebugEnabled()) { + logger.debug("==> ORCFileUtil.init()"); + } + this.orcBufferSize = orcBufferSize; + this.orcStripeSize = orcStripeSize; + this.compressionKind = getORCCompression(compression); + initORCAuditSchema(); + if (logger.isDebugEnabled()) { + logger.debug("<== ORCFileUtil.init() : orcBufferSize: " + orcBufferSize + " stripeSize: " + orcStripeSize + + " compression: " + compression); + } + } + + public Writer createWriter(Configuration conf, FileSystem fs, String path) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> ORCFileUtil.createWriter()"); + } + Writer ret = null; + WriterOptions writeOptions = OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(schema) + .bufferSize(orcBufferSize) + .stripeSize(orcStripeSize) + .compress(compressionKind); + + ret = OrcFile.createWriter(new Path(path), writeOptions); + if (logger.isDebugEnabled()) { + logger.debug("<== ORCFileUtil.createWriter()"); + } + return ret; + } + + public void close(Writer writer) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> ORCFileUtil.close()"); + } + + writer.close(); + + if (logger.isDebugEnabled()) { + logger.debug("<== ORCFileUtil.close()"); + } + } + + public void log(Writer writer, Collection events) throws Exception { + int eventBatchSize = events.size(); + + if (logger.isDebugEnabled()) { + logger.debug("==> ORCFileUtil.log() : EventSize: " + eventBatchSize + "ORC bufferSize:" + orcBufferSize ); + } + + //increase the batch size according to event size, so it can accomodate all the events. + if (eventBatchSize > orcBufferSize) { + batch = schema.createRowBatch(orcBufferSize); + } + + try { + for(AuthzAuditEvent event : events) { + int row = batch.size++; + for (int j=0;j ORCWriter.initORCAuditSchema()"); + } + auditSchema = getAuditSchema(); + Map schemaFieldTypeMap = getSchemaFieldTypeMap(); + schema = TypeDescription.fromString(auditSchema); + batch = schema.createRowBatch(orcBufferSize); + buildVectorRowBatch(schemaFieldTypeMap); + if (logger.isDebugEnabled()) { + logger.debug("<== ORCWriter.initORCAuditSchema()"); + } + } + + protected Map getSchemaFieldTypeMap() { + Map ret = new HashMap<>(); + + int index1 = auditSchema.indexOf("<"); + int index2 = auditSchema.indexOf(">"); + String subAuditSchema = auditSchema.substring(index1+1,index2); + String[] fields = subAuditSchema.split(","); + schemaFields = new ArrayList<>(); + + for (String field: fields) { + String[] flds = field.split(":"); + schemaFields.add(flds[0]); + ret.put(flds[0],flds[1]); + } + return ret; + } + + protected void buildVectorRowBatch(Map schemaFieldTypeMap) throws Exception { + int i = 0; + for (i=0;i cls = fld.getType(); + Object value = fld.get(event); + + ret.setField(fieldName); + ret.setType(cls.getName()); + ret.setValue(value); + } catch (Exception e){ + logger.error("Error while writing into ORC File:", e); + } + return ret; + } + + protected ColumnVector getColumnVectorType(String fieldType) throws Exception { + ColumnVector ret = null; + fieldType = fieldType.toLowerCase(); + switch(fieldType) { + case "int" : + case "bigint": + case "date": + case "boolean": + ret = new LongColumnVector(); + break; + case "string": + case "varchar": + case "char": + case "binary": + ret = new BytesColumnVector(); + break; + case "decimal": + ret = new DecimalColumnVector(10,5); + break; + case "double": + case "float": + ret = new DoubleColumnVector(); + break; + case "array": + case "map": + case "uniontype": + case "struct": + throw new Exception("Unsuppoted field Type"); + } + return ret; + } + + protected Long castLongObject(Object object) { + Long ret = 0l; + try { + if (object instanceof Long) + ret = ((Long) object).longValue(); + else if (object instanceof Integer) { + ret = ((Integer) object).longValue(); + } else if (object instanceof String) { + ret = Long.valueOf((String) object); + } + } catch (Exception e) { + logger.error("Error while writing into ORC File:", e); + } + return ret; + } + + protected String castStringObject(Object object) { + String ret = null; + try { + if (object instanceof String) + ret = (String) object; + else if (object instanceof Date) { + ret = (getDateString((Date) object)); + } + } catch (Exception e) { + logger.error("Error while writing into ORC File:", e); + } + return ret; + } + + protected String getAuditSchema() { + if (logger.isDebugEnabled()) { + logger.debug("==> ORCWriter.getAuditSchema()"); + } + String ret = null; + String fieldStr = "struct<"; + StringBuilder sb = new StringBuilder(fieldStr); + + Class auditEventClass = AuthzAuditEvent.class; + for(Field fld: auditEventClass.getDeclaredFields()) { + if (fld.isAnnotationPresent(SerializedName.class)) { + String field = fld.getName(); + String fieldType = getShortFieldType(fld.getType().getName()); + if (fieldType == null) { + continue; + } + fieldStr = field + ":" + fieldType + ","; + sb.append(fieldStr); + } + } + fieldStr = sb.toString(); + if (fieldStr.endsWith(",")) { + fieldStr = fieldStr.substring(0, fieldStr.length() - 1); + } + ret = fieldStr + ">"; + + if (logger.isDebugEnabled()) { + logger.debug("<== ORCWriter.getAuditSchema() AuditSchema: " + ret); + } + return ret; + } + + protected String getShortFieldType(String type){ + String ret = null; + switch(type) { + case "java.lang.String": + ret = "string"; + break; + case "int": + ret = "int"; + break; + case "short": + ret = "string"; + break; + case "java.util.Date": + ret = "string"; + break; + case "long": + ret = "bigint"; + break; + default: + ret = null; + } + return ret; + } + + class SchemaInfo { + String field = null; + String type = null; + Object value = null; + + public String getField() { + return field; + } + + public void setField(String field) { + this.field = field; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + } + + protected CompressionKind getORCCompression(String compression) { + CompressionKind ret; + if (compression == null) { + compression = defaultCompression.name().toLowerCase(); + } + switch(compression) { + case "snappy": + ret = CompressionKind.SNAPPY; + break; + case "lzo": + ret = CompressionKind.LZO; + break; + case "zlip": + ret = CompressionKind.ZLIB; + break; + case "none": + ret = CompressionKind.NONE; + break; + default: + ret = defaultCompression; + break; + } + return ret; + } + + public static void main(String[] args) throws Exception { + ORCFileUtil auditOrcFileUtil = new ORCFileUtil(); + auditOrcFileUtil.init(10000,100000L,"snappy"); + try { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Writer write = auditOrcFileUtil.createWriter(conf, fs, "/tmp/test.orc"); + Collection events = getTestEvent(); + auditOrcFileUtil.log(write, events); + write.close(); + } catch (Exception e){ + e.printStackTrace(); + } + } + + protected static Collection getTestEvent() { + Collection events = new ArrayList<>(); + for (int idx=0;idx<20;idx++) { + AuthzAuditEvent event = new AuthzAuditEvent(); + event.setEventId(Integer.toString(idx)); + event.setClientIP("127.0.0.1"); + event.setAccessResult((short) 1); + event.setAclEnforcer("ranger-acl"); + event.setRepositoryName("hdfsdev"); + event.setRepositoryType(EnumRepositoryType.HDFS); + event.setResourcePath("/tmp/test-audit.log" +idx+idx+1); + event.setResourceType("file"); + event.setAccessType("read"); + event.setEventTime(new Date()); + event.setResultReason(Integer.toString(1)); + events.add(event); + } + return events; + } +} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCWriter.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCWriter.java new file mode 100644 index 0000000..19a928c --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCWriter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.utils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.orc.Writer; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; + +import java.io.File; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +/** + * This class writes the Ranger audits to HDFS as ORC files + * Refer README.TXT for enabling ORCWriter. + */ +public class ORCWriter extends AbstractAuditWriter { + private static final Log logger = LogFactory + .getLog(ORCWriter.class); + + protected static final String ORC_FILE_EXTENSION = ".orc"; + protected volatile ORCFileUtil orcFileUtil = null; + protected Writer orcLogWriter = null; + protected String fileType = "orc"; + protected String compression = null; + protected int orcBufferSize = 0; + protected int defaultbufferSize = 100000; + protected long orcStripeSize = 0; + protected long defaultStripeSize = 100000L; + + + @Override + public void init(Properties props, String propPrefix, String auditProviderName, Map auditConfigs) { + if (logger.isDebugEnabled()) { + logger.debug("==> ORCWriter.init()"); + } + init(props,propPrefix); + super.init(props, propPrefix, auditProviderName, auditConfigs); + if (logger.isDebugEnabled()) { + logger.debug("<== ORCWriter.init()"); + } + } + + synchronized public boolean logAuditAsORC(final Collection events) throws Exception { + boolean ret = false; + Writer out = null; + try { + if (logger.isDebugEnabled()) { + logger.debug("UGI=" + MiscUtil.getUGILoginUser() + + ". Will write to HDFS file=" + currentFileName); + } + + out = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction() { + @Override + public Writer run() throws Exception { + Writer out = getORCFileWrite(); + orcFileUtil.log(out,events); + return out; + }; + }); + } catch (Exception e) { + orcLogWriter = null; + logger.error("Error while writing into ORC FileWriter", e); + throw e; + } finally { + logger.info("Flushing HDFS audit in ORC Format. Event Size:" + events.size()); + if (out != null) { + try { + //flush and close the ORC batch file + orcFileUtil.close(out); + ret = true; + } catch (Exception e) { + logger.error("Error while closing the ORC FileWriter", e); + throw e; + } + orcLogWriter = null; + } + } + return ret; + } + + @Override + public void flush() { + //For HDFSAuditDestionation with ORC format each file is flush immediately after writing the ORC batch. + //So nothing to flush + } + + @Override + public boolean log(Collection events) throws Exception { + return logAsORC(events); + } + + @Override + public void start() { + // Nothing to do here. We will open the file when the first log request + // comes + } + + @Override + synchronized public void stop() { + if (orcLogWriter != null) { + try { + orcFileUtil.close(orcLogWriter); + } catch (Throwable t) { + logger.error("Error on closing log ORC Writer. Exception will be ignored. name=" + + auditProviderName + ", fileName=" + currentFileName); + } + orcLogWriter = null; + } + } + + @Override + public boolean logFile(File file) throws Exception { + return false; + } + + // Creates ORC Write file + synchronized protected Writer getORCFileWrite() throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("==> ORCWriter.getORCFileWrite()"); + } + if (orcLogWriter == null) { + // Create the file to write + createFileSystemFolders(); + logger.info("Creating new log file. hdfPath=" + fullPath); + orcLogWriter = orcFileUtil.createWriter(conf, fileSystem, fullPath); + currentFileName = fullPath; + } + if (logger.isDebugEnabled()) { + logger.debug("<== ORCWriter.getORCFileWrite()"); + } + return orcLogWriter; + } + + public boolean logAsORC(Collection events) throws Exception { + boolean ret = false; + Collection authzAuditEvents = getAuthzAuditEvents(events); + ret = logAuditAsORC(authzAuditEvents); + return ret; + } + + public Collection getAuthzAuditEvents(Collection events) throws Exception { + Collection ret = new ArrayList<>(); + for (String event : events) { + try { + AuthzAuditEvent authzAuditEvent = MiscUtil.fromJson(event, AuthzAuditEvent.class); + ret.add(authzAuditEvent); + } catch (Exception e) { + logger.error("Error converting to From JSON to AuthzAuditEvent=" + event); + throw e; + } + } + return ret; + } + + public void init(Properties props, String propPrefix) { + compression = MiscUtil.getStringProperty(props, propPrefix + "." + fileType +".compression"); + orcBufferSize = MiscUtil.getIntProperty(props, propPrefix + "." + fileType +".buffersize",defaultbufferSize); + orcStripeSize = MiscUtil.getLongProperty(props, propPrefix + "." + fileType +".stripesize",defaultStripeSize); + setFileExtension(ORC_FILE_EXTENSION); + try { + orcFileUtil = ORCFileUtil.getInstance(); + orcFileUtil.init(orcBufferSize, orcStripeSize, compression); + } catch ( Exception e) { + logger.error("Error while doing ORCWriter.init() ", e); + } + } +} diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/Writer.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/Writer.java new file mode 100644 index 0000000..6f7c3d2 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/Writer.java @@ -0,0 +1,41 @@ +package org.apache.ranger.audit.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +/* Writer Interface to writer Ranger Audits to Hdfs */ + +public interface Writer { + void init(Properties prop, String propPrefix, String auditProviderName, Map auditConfigs); + + boolean log(Collection events) throws Exception; + + boolean logFile(File file) throws Exception; + + void start(); + + void flush(); + + void stop(); +} diff --git a/pom.xml b/pom.xml index 589cd6a..530b37a 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ 2.7.1 1.3 1.1.3 - 2.1.0 + 2.3.0 3.1.0-incubating 4.5.3 4.4.6 diff --git a/src/main/assembly/hbase-agent.xml b/src/main/assembly/hbase-agent.xml index 3ebc334..5c27238 100644 --- a/src/main/assembly/hbase-agent.xml +++ b/src/main/assembly/hbase-agent.xml @@ -54,6 +54,7 @@ org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version} org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.noggit:noggit:jar:${noggit.version} + org.apache.hive:hive-exec:jar:${hive.version} org.apache.solr:solr-solrj false diff --git a/src/main/assembly/hdfs-agent.xml b/src/main/assembly/hdfs-agent.xml index 5279a9a..fd95508 100644 --- a/src/main/assembly/hdfs-agent.xml +++ b/src/main/assembly/hdfs-agent.xml @@ -86,6 +86,7 @@ org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version} org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.noggit:noggit:jar:${noggit.version} + org.apache.hive:hive-exec:jar:${hive.version} org.apache.solr:solr-solrj false diff --git a/src/main/assembly/hive-agent.xml b/src/main/assembly/hive-agent.xml index ca65c80..6c736b0 100644 --- a/src/main/assembly/hive-agent.xml +++ b/src/main/assembly/hive-agent.xml @@ -54,6 +54,7 @@ org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version} org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.noggit:noggit:jar:${noggit.version} + org.apache.hive:hive-exec:jar:${hive.version} org.apache.solr:solr-solrj false diff --git a/src/main/assembly/knox-agent.xml b/src/main/assembly/knox-agent.xml index 8357d49..39341a6 100644 --- a/src/main/assembly/knox-agent.xml +++ b/src/main/assembly/knox-agent.xml @@ -61,6 +61,7 @@ org.apache.htrace:htrace-core:jar:${htrace-core.version} org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version} org.codehaus.jackson:jackson-mapper-asl:jar:${codehaus.jackson.version} + org.apache.hive:hive-exec:jar:${hive.version} org.apache.solr:solr-solrj false diff --git a/src/main/assembly/plugin-atlas.xml b/src/main/assembly/plugin-atlas.xml index fd98811..31a78f2 100644 --- a/src/main/assembly/plugin-atlas.xml +++ b/src/main/assembly/plugin-atlas.xml @@ -57,6 +57,7 @@ org.apache.httpcomponents:httpmime:jar:${httpcomponents.httpmime.version} org.apache.hadoop:hadoop-hdfs:jar:${hadoop.version} org.apache.solr:solr-solrj + org.apache.hive:hive-exec:jar:${hive.version} false diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml index 95855d9..9fcadec 100644 --- a/src/main/assembly/plugin-kafka.xml +++ b/src/main/assembly/plugin-kafka.xml @@ -69,6 +69,7 @@ org.apache.httpcomponents:httpmime:jar:${httpcomponents.httpmime.version} + org.apache.hive:hive-exec:jar:${hive.version} org.noggit:noggit:jar:${noggit.version} org.codehaus.jackson:jackson-core-asl org.codehaus.jackson:jackson-jaxrs diff --git a/src/main/assembly/plugin-kms.xml b/src/main/assembly/plugin-kms.xml index 6d15f2a..226ed04 100755 --- a/src/main/assembly/plugin-kms.xml +++ b/src/main/assembly/plugin-kms.xml @@ -62,6 +62,7 @@ org.noggit:noggit:jar:${noggit.version} org.apache.zookeeper:zookeeper:jar:${zookeeper.version} org.apache.solr:solr-solrj + org.apache.hive:hive-exec:jar:${hive.version} diff --git a/src/main/assembly/plugin-solr.xml b/src/main/assembly/plugin-solr.xml index de30bfb..38fd8ef 100644 --- a/src/main/assembly/plugin-solr.xml +++ b/src/main/assembly/plugin-solr.xml @@ -52,6 +52,7 @@ org.codehaus.jackson:jackson-jaxrs org.codehaus.jackson:jackson-mapper-asl org.codehaus.jackson:jackson-xc + org.apache.hive:hive-exec:jar:${hive.version} diff --git a/src/main/assembly/plugin-sqoop.xml b/src/main/assembly/plugin-sqoop.xml index d2bd69a..ba7a4a9 100644 --- a/src/main/assembly/plugin-sqoop.xml +++ b/src/main/assembly/plugin-sqoop.xml @@ -56,6 +56,7 @@ org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.noggit:noggit:jar:${noggit.version} org.apache.solr:solr-solrj + org.apache.hive:hive-exec:jar:${hive.version} diff --git a/src/main/assembly/plugin-yarn.xml b/src/main/assembly/plugin-yarn.xml index c6a48e8..617d382 100644 --- a/src/main/assembly/plugin-yarn.xml +++ b/src/main/assembly/plugin-yarn.xml @@ -56,6 +56,7 @@ org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.noggit:noggit:jar:${noggit.version} org.apache.solr:solr-solrj + org.apache.hive:hive-exec:jar:${hive.version} diff --git a/src/main/assembly/storm-agent.xml b/src/main/assembly/storm-agent.xml index 64224ec..c94df91 100644 --- a/src/main/assembly/storm-agent.xml +++ b/src/main/assembly/storm-agent.xml @@ -61,6 +61,7 @@ org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version} org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version} org.apache.httpcomponents:httpmime:jar:${httpcomponents.httpmime.version} + org.apache.hive:hive-exec:jar:${hive.version} org.noggit:noggit:jar:${noggit.version} com.google.protobuf:protobuf-java:jar:${protobuf-java.version} org.apache.hadoop:hadoop-hdfs:jar:${hadoop.version} -- 2.6.4 (Apple Git-63)