### Eclipse Workspace Patch 1.0 #P log4j-flume-ng Index: src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java =================================================================== --- src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (revision 1609586) +++ src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (working copy) @@ -181,7 +181,8 @@ @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, @PluginElement("Layout") Layout layout, - @PluginElement("Filter") final Filter filter) { + @PluginElement("Filter") final Filter filter, + @PluginAttribute("maxEventsCountInDB") final String maxEventsCountInDB) { final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : (agents == null || agents.length == 0) && properties != null && properties.length > 0; @@ -218,7 +219,8 @@ final int retries = Integers.parseInt(agentRetries, 0); final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY); - + final int maxEventsInDB = Integers.parseInt(maxEventsCountInDB, -1); + if (layout == null) { final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID, @@ -250,7 +252,7 @@ agents = new Agent[] {Agent.createAgent(null, null)}; } manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, - connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir); + connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir, maxEventsInDB); break; default: LOGGER.debug("No manager type specified. Defaulting to AVRO"); Index: src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java =================================================================== --- src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (revision 1609586) +++ src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (working copy) @@ -121,7 +121,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); avroLogger.addAppender(avroAppender); avroLogger.setLevel(Level.ALL); @@ -150,7 +150,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, "ReqCtx_", null, "true", - "1", null, null, null, null); + "1", null, null, null, null, null); avroAppender.start(); final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger"); Assert.assertNotNull(eventLogger); @@ -190,7 +190,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); avroLogger.addAppender(avroAppender); avroLogger.setLevel(Level.ALL); @@ -224,7 +224,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "10", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); avroLogger.addAppender(avroAppender); avroLogger.setLevel(Level.ALL); @@ -258,7 +258,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); avroLogger.addAppender(avroAppender); avroLogger.setLevel(Level.ALL); @@ -288,7 +288,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); Assert.assertTrue("Appender Not started", avroAppender.isStarted()); avroLogger.addAppender(avroAppender); @@ -336,7 +336,7 @@ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", - null, null, null, null); + null, null, null, null, null); avroAppender.start(); avroLogger.addAppender(avroAppender); avroLogger.setLevel(Level.ALL); Index: src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderMaxEventsTest.java =================================================================== --- src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderMaxEventsTest.java (revision 0) +++ src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderMaxEventsTest.java (working copy) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.flume.appender; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.junit.Assert; + +import org.apache.logging.log4j.EventLogger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.LoggingException; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.apache.logging.log4j.message.StructuredDataMessage; +import org.apache.logging.log4j.status.StatusLogger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + */ +public class FlumePersistentAppenderMaxEventsTest { + private static final String CONFIG = "persistent_max_events.xml"; + private static LoggerContext ctx; + + @BeforeClass + public static void setupClass() { + final File file = new File("target/persistent_max_events"); + if (!deleteFiles(file)) { + System.err.println("Warning - unable to delete target/persistent_max_events. Test errors may occur"); + } + } + + @AfterClass + public static void cleanupClass() { + StatusLogger.getLogger().reset(); + } + + @Before + public void setUp() throws Exception { + + final File file = new File("target/persistent_max_events"); + final boolean result = deleteFiles(file); + + System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG); + ctx = (LoggerContext) LogManager.getContext(false); + ctx.reconfigure(); + } + + @After + public void teardown() throws Exception { + ctx.stop(); + System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); + ctx.reconfigure(); + final File file = new File("target/persistent_max_events"); + final boolean result = deleteFiles(file); + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null); + for (final ObjectName name : names) { + try { + server.unregisterMBean(name); + } catch (final Exception ex) { + System.out.println("Unable to unregister " + name.toString()); + } + } + } + + + + @Test + public void testMultiple() throws InterruptedException, IOException { + boolean exceptionOccured = false; + try { + for (int i = 0; i < 6; ++i) { + final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + EventLogger.logEvent(msg); + } + } catch (LoggingException e) { + exceptionOccured = true; + } + Assert.assertTrue("Did not throw LoggingException", exceptionOccured); + } + + + + + private static boolean deleteFiles(final File file) { + boolean result = true; + if (file.isDirectory()) { + + final File[] files = file.listFiles(); + for (final File child : files) { + result &= deleteFiles(child); + } + + } else if (!file.exists()) { + return true; + } + + return result &= file.delete(); + } + +} \ No newline at end of file Index: src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java =================================================================== --- src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (revision 1609586) +++ src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (working copy) @@ -93,6 +93,8 @@ private final int delay; private final int lockTimeoutRetryCount; + + private final int maxEventsInDB; private final ExecutorService threadPool; @@ -117,7 +119,7 @@ final int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, final int delay, final Database database, final Environment environment, final SecretKey secretKey, - final int lockTimeoutRetryCount) { + final int lockTimeoutRetryCount, final int maxEventsInDB) { super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); this.delay = delay; this.database = database; @@ -129,6 +131,7 @@ this.secretKey = secretKey; this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); this.lockTimeoutRetryCount = lockTimeoutRetryCount; + this.maxEventsInDB = maxEventsInDB; } @@ -150,7 +153,7 @@ final Property[] properties, int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, final int delay, final int lockTimeoutRetryCount, - final String dataDir) { + final String dataDir, final int maxEventsInDB) { if (agents == null || agents.length == 0) { throw new IllegalArgumentException("At least one agent is required"); } @@ -172,7 +175,7 @@ sb.append(']'); sb.append(' ').append(dataDirectory); return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, - connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); + connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties, maxEventsInDB)); } @Override @@ -199,6 +202,9 @@ cipher.init(Cipher.ENCRYPT_MODE, secretKey); eventData = cipher.doFinal(eventData); } + if (maxEventsInDB != -1 && dbCount.get() >= maxEventsInDB) { + throw new LoggingException("Database has reached the max limit of events: " + dbCount.get()); + } final Future future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); boolean interrupted = false; @@ -354,6 +360,7 @@ private final int delay; private final int lockTimeoutRetryCount; private final Property[] properties; + private final int maxEventsInDB; /** * Constructor. @@ -364,7 +371,8 @@ */ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, final int delay, - final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { + final int lockTimeoutRetryCount, final String dataDir, final Property[] properties, + final int maxEventsInDB) { this.name = name; this.agents = agents; this.batchSize = batchSize; @@ -375,6 +383,7 @@ this.delay = delay; this.lockTimeoutRetryCount = lockTimeoutRetryCount; this.properties = properties; + this.maxEventsInDB = maxEventsInDB; } } @@ -471,7 +480,7 @@ } return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, - data.lockTimeoutRetryCount); + data.lockTimeoutRetryCount, data.maxEventsInDB); } } Index: src/test/resources/persistent_max_events.xml =================================================================== --- src/test/resources/persistent_max_events.xml (revision 0) +++ src/test/resources/persistent_max_events.xml (working copy) @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file Index: src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java =================================================================== --- src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (revision 1609586) +++ src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (working copy) @@ -76,7 +76,7 @@ @BeforeClass public static void setupClass() { // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); - final File file = new File("target/file-channel"); + final File file = new File("target/persistent"); if (!deleteFiles(file)) { System.err.println("Warning - unable to delete target/file-channel. Test errors may occur"); } @@ -114,7 +114,7 @@ ctx.reconfigure(); primary.stop(); alternate.stop(); - final File file = new File("target/file-channel"); + final File file = new File("target/persistent"); final boolean result = deleteFiles(file); final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);