Index: log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (revision 3) +++ log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (revision ) @@ -16,26 +16,7 @@ */ package org.apache.logging.log4j.flume.appender; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import javax.crypto.Cipher; -import javax.crypto.SecretKey; - -import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.*; import org.apache.flume.Event; import org.apache.flume.event.SimpleEvent; import org.apache.logging.log4j.LoggingException; @@ -47,17 +28,15 @@ import org.apache.logging.log4j.core.helpers.SecretKeyProvider; import org.apache.logging.log4j.core.helpers.Strings; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.CursorConfig; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.StatsConfig; -import com.sleepycat.je.Transaction; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import java.io.*; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Manager that persists data to Berkeley DB before passing it on to Flume. @@ -83,7 +62,7 @@ private final Database database; - private final Environment environment; + private volatile Environment environment; private final WriterThread worker; @@ -115,17 +94,17 @@ * @param lockTimeoutRetryCount The number of times to retry a lock timeout. */ protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, - final int batchSize, final int retries, final int connectionTimeout, - final int requestTimeout, final int delay, final Database database, - final Environment environment, final SecretKey secretKey, - final int lockTimeoutRetryCount) { + final int batchSize, final int retries, final int connectionTimeout, + final int requestTimeout, final int delay, final Database database, + final Environment environment, final SecretKey secretKey, + final int lockTimeoutRetryCount) { super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); this.delay = delay; this.database = database; this.environment = environment; dbCount.set(database.count()); this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, - lockTimeoutRetryCount); + lockTimeoutRetryCount); this.worker.start(); this.secretKey = secretKey; this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); @@ -171,10 +150,59 @@ } sb.append("]"); sb.append(" ").append(dataDirectory); - return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, + + FlumePersistentManager flumePersistentManager = getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, - connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); + connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); + return flumePersistentManager; + } + + public static FlumePersistentManager reCreateFlumePersistentManager(FlumePersistentManager flumePersistentManager, final String name, final Agent[] agents, + final Property[] properties, int batchSize, final int retries, + final int connectionTimeout, final int requestTimeout, + final int delay, final int lockTimeoutRetryCount, + final String dataDir) { + LOGGER.error("Database Environment is Invalid. Closing and Opening the Database handler in FlumePersistentManager."); + FlumePersistentManager flumePersistentManagerNew = null; + final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; + final StringBuilder sb = new StringBuilder("FlumePersistent["); + boolean first = true; + for (final Agent agent : agents) { + if (!first) { + sb.append(","); + } + sb.append(agent.getHost()).append(":").append(agent.getPort()); + first = false; + } + sb.append("]"); + sb.append(" ").append(dataDirectory); + flumePersistentManager.release(); + flumePersistentManagerNew = getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, + connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); + LOGGER.debug("Created a new instance of FlumePersistentManager. Return this to Appender."); + + return flumePersistentManagerNew; + } + + /** + * Verify if the Database Environment is valid or not. + * If not valid return false. + * @param flumePersistentManager the flume persistent manager. + * @return false if the database environment is not valid. + */ + public static Boolean verifyDatabaseEnvironment(FlumePersistentManager flumePersistentManager) { + LOGGER.debug("Entering Inside FlumePersistentManager:verifyDatabaseEnvironment"); + //Check If the Berkley DB's environment is valid. If not, close and open the database connection. + if(!flumePersistentManager.getEnvironment().isValid()) { + LOGGER.error("Database Environment is Invalid. Need to close and Open a new connection."); + return false; + } + + return true; + + } + @Override public void send(final Event event) { if (worker.isShutdown()) { @@ -200,7 +228,7 @@ eventData = cipher.doFinal(eventData); } final Future future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, - gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); + gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); boolean interrupted = false; int count = 0; do { @@ -301,7 +329,8 @@ } exception = null; break; - } catch (final LockConflictException lce) { + } + catch (final LockConflictException lce) { exception = lce; // Fall through and retry. } catch (final Exception ex) { @@ -404,7 +433,6 @@ } try { - final File dir = new File(data.dataDir); FileUtils.mkdir(dir, true); final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); @@ -445,7 +473,7 @@ LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); } catch (final Exception ex) { LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", - cl.getName()); + cl.getName()); } break; } @@ -461,8 +489,8 @@ LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); } return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, - data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, - data.lockTimeoutRetryCount); + data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, + data.lockTimeoutRetryCount); } } @@ -811,12 +839,12 @@ public DaemonThreadFactory() { final SecurityManager securityManager = System.getSecurityManager(); group = (securityManager != null) ? securityManager.getThreadGroup() : - Thread.currentThread().getThreadGroup(); + Thread.currentThread().getThreadGroup(); namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; } @Override - public Thread newThread(final Runnable r) { + public Thread newThread(final Runnable r) { final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); thread.setDaemon(true); if (thread.getPriority() != Thread.NORM_PRIORITY) { @@ -847,4 +875,21 @@ wait(timeout); } } + + /** + * This method is not exposed through the public API and is used primarily for unit testing. + * @param environment The environment to add to the FlumePersistentManager. + */ + public void setEnvironment(final Environment environment) { + this.environment = environment; + } + + /** + * This method is not exposed through the public API and is used primarily for unit testing. + * @return environment The environment used in the FlumePersistentManager. + */ + public Environment getEnvironment() { + return this.environment; + } + -} +} \ No newline at end of file Index: log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (revision 1) +++ log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (revision ) @@ -35,6 +35,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +import com.sleepycat.je.Environment; import org.apache.avro.AvroRemoteException; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Responder; @@ -62,6 +63,9 @@ import com.google.common.base.Preconditions; +import static org.easymock.EasyMock.*; +import static org.easymock.EasyMock.createStrictMock; + /** * */ @@ -73,6 +77,8 @@ private EventCollector primary; private EventCollector alternate; + private Environment environmentMock; + @BeforeClass public static void setupClass() { // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); @@ -463,4 +469,85 @@ } return ports; } + + @Test + public void testLog4EventForEnvironmentFailure() throws InterruptedException, IOException { + + final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); + final org.apache.logging.log4j.core.Logger eventLogger = (org.apache.logging.log4j.core.Logger) LogManager.getLogger("EventLogger"); + Map appenders = eventLogger.getAppenders(); + FlumeAppender flumeAppender = (FlumeAppender) appenders.get("eventLogger"); + FlumePersistentManager flumePersistentManager = (FlumePersistentManager) flumeAppender.getManager(); + environmentMock = createStrictMock(Environment.class); + flumePersistentManager.setEnvironment(environmentMock); + flumeAppender.setManager(flumePersistentManager); + expect(environmentMock.isValid()) + .andReturn(false); + expectLastCall().times(1); + expect(environmentMock.cleanLog()) + .andReturn(0); + expectLastCall().times(1); + environmentMock.close(); + expectLastCall().times(1); + replay(environmentMock); + EventLogger.logEvent(msg); + verify(environmentMock); + final Event event = primary.poll(); + Assert.assertNotNull(event); + final String body = getBody(event); + Assert.assertTrue("Channel contained event, but not expected message. Received: " + body, + body.endsWith("Test Log4j")); + } + + + @Test + public void testMultipleEventsForEnvironmentFailure() throws InterruptedException, IOException { + + for (int i = 0; i < 10; ++i) { + final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + if (i==5) { + final org.apache.logging.log4j.core.Logger eventLogger = (org.apache.logging.log4j.core.Logger) LogManager.getLogger("EventLogger"); + Map appenders = eventLogger.getAppenders(); + FlumeAppender flumeAppender = (FlumeAppender) appenders.get("eventLogger"); + FlumePersistentManager flumePersistentManager = (FlumePersistentManager) flumeAppender.getManager(); + environmentMock = createStrictMock(Environment.class); + flumePersistentManager.setEnvironment(environmentMock); + flumeAppender.setManager(flumePersistentManager); + expect(environmentMock.isValid()) + .andReturn(false); + expectLastCall().times(1); + expect(environmentMock.cleanLog()) + .andReturn(0); + expectLastCall().times(1); + environmentMock.close(); + expectLastCall().times(1); + replay(environmentMock); + EventLogger.logEvent(msg); + verify(environmentMock); + + } else { + EventLogger.logEvent(msg); + } + + + } + final boolean[] fields = new boolean[10]; + for (int i = 0; i < 10; ++i) { + final Event event = primary.poll(); + Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event); + final String value = event.getHeaders().get("counter"); + Assert.assertNotNull("Missing counter", value); + final int counter = Integer.parseInt(value); + if (fields[counter]) { + Assert.fail("Duplicate event"); + } else { + fields[counter] = true; + } + } + for (int i = 0; i < 10; ++i) { + Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]); + } + } + } \ No newline at end of file Index: log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (revision 3) +++ log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (revision 3) @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -package org.apache.logging.log4j.flume.appender; - -import java.io.Serializable; -import java.util.Locale; - -import org.apache.logging.log4j.core.Filter; -import org.apache.logging.log4j.core.Layout; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.config.plugins.PluginAttribute; -import org.apache.logging.log4j.core.config.plugins.PluginElement; -import org.apache.logging.log4j.core.config.plugins.PluginFactory; -import org.apache.logging.log4j.core.helpers.Booleans; -import org.apache.logging.log4j.core.helpers.Integers; -import org.apache.logging.log4j.core.layout.RFC5424Layout; - -/** - * An Appender that uses the Avro protocol to route events to Flume. - */ -@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) -public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { - - private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; - private static final int DEFAULT_MAX_DELAY = 60000; - - private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; - - private final AbstractFlumeManager manager; - - private final String mdcIncludes; - private final String mdcExcludes; - private final String mdcRequired; - - private final String eventPrefix; - - private final String mdcPrefix; - - private final boolean compressBody; - - private final FlumeEventFactory factory; - - /** - * Which Manager will be used by the appender instance. - */ - private enum ManagerType { - AVRO, EMBEDDED, PERSISTENT; - - public static ManagerType getType(final String type) { - return valueOf(type.toUpperCase(Locale.US)); - } - } - - private FlumeAppender(final String name, final Filter filter, final Layout layout, - final boolean ignoreExceptions, final String includes, final String excludes, - final String required, final String mdcPrefix, final String eventPrefix, - final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) { - super(name, filter, layout, ignoreExceptions); - this.manager = manager; - this.mdcIncludes = includes; - this.mdcExcludes = excludes; - this.mdcRequired = required; - this.eventPrefix = eventPrefix; - this.mdcPrefix = mdcPrefix; - this.compressBody = compress; - this.factory = factory == null ? this : factory; - } - - /** - * Publish the event. - * @param event The LogEvent. - */ - @Override - public void append(final LogEvent event) { - final String name = event.getLoggerName(); - if (name != null) { - for (final String pkg : EXCLUDED_PACKAGES) { - if (name.startsWith(pkg)) { - return; - } - } - } - final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, - eventPrefix, compressBody); - flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); - manager.send(flumeEvent); - } - - @Override - public void stop() { - super.stop(); - manager.release(); - } - - /** - * Create a Flume event. - * @param event The Log4j LogEvent. - * @param includes comma separated list of mdc elements to include. - * @param excludes comma separated list of mdc elements to exclude. - * @param required comma separated list of mdc elements that must be present with a value. - * @param mdcPrefix The prefix to add to MDC key names. - * @param eventPrefix The prefix to add to event fields. - * @param compress If true the body will be compressed. - * @return A Flume Event. - */ - @Override - public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, - final String required, final String mdcPrefix, final String eventPrefix, - final boolean compress) { - return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, - eventPrefix, compressBody); - } - - /** - * Create a Flume Avro Appender. - * @param agents An array of Agents. - * @param properties Properties to pass to the embedded agent. - * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. - * Note: The embedded attribute is deprecated in favor of specifying the type attribute. - * @param type Avro (default), Embedded, or Persistent. - * @param dataDir The directory where the Flume FileChannel should write its data. - * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is - * 1000. - * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000. - * @param agentRetries The number of times to retry an agent before failing to the next agent. - * @param maxDelay The maximum number of seconds to wait for a complete batch. - * @param name The name of the Appender. - * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise - * they are propagated to the caller. - * @param excludes A comma separated list of MDC elements to exclude. - * @param includes A comma separated list of MDC elements to include. - * @param required A comma separated list of MDC elements that are required. - * @param mdcPrefix The prefix to add to MDC key names. - * @param eventPrefix The prefix to add to event key names. - * @param compressBody If true the event body will be compressed. - * @param batchSize Number of events to include in a batch. Defaults to 1. - * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. - * @param factory The factory to use to create Flume events. - * @param layout The layout to format the event. - * @param filter A Filter to filter events. - * - * @return A Flume Avro Appender. - */ - @PluginFactory - public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents, - @PluginElement("Properties") final Property[] properties, - @PluginAttribute("embedded") final String embedded, - @PluginAttribute("type") final String type, - @PluginAttribute("dataDir") final String dataDir, - @PluginAttribute("connectTimeout") final String connectionTimeout, - @PluginAttribute("requestTimeout") final String requestTimeout, - @PluginAttribute("agentRetries") final String agentRetries, - @PluginAttribute("maxDelay") final String maxDelay, - @PluginAttribute("name") final String name, - @PluginAttribute("ignoreExceptions") final String ignore, - @PluginAttribute("mdcExcludes") final String excludes, - @PluginAttribute("mdcIncludes") final String includes, - @PluginAttribute("mdcRequired") final String required, - @PluginAttribute("mdcPrefix") final String mdcPrefix, - @PluginAttribute("eventPrefix") final String eventPrefix, - @PluginAttribute("compress") final String compressBody, - @PluginAttribute("batchSize") final String batchSize, - @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, - @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, - @PluginElement("Layout") Layout layout, - @PluginElement("Filters") final Filter filter) { - - final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : - (agents == null || agents.length == 0) && properties != null && properties.length > 0; - final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); - final boolean compress = Booleans.parseBoolean(compressBody, true); - ManagerType managerType; - if (type != null) { - if (embed && embedded != null) { - try { - managerType = ManagerType.getType(type); - LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); - } catch (final Exception ex) { - LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + - " is invalid."); - managerType = ManagerType.EMBEDDED; - } - } else { - try { - managerType = ManagerType.getType(type); - } catch (final Exception ex) { - LOGGER.warn("Type " + type + " is invalid."); - managerType = ManagerType.EMBEDDED; - } - } - } else if (embed) { - managerType = ManagerType.EMBEDDED; - } else { - managerType = ManagerType.AVRO; - } - - final int batchCount = Integers.parseInt(batchSize, 1); - final int connectTimeout = Integers.parseInt(connectionTimeout, 0); - final int reqTimeout = Integers.parseInt(requestTimeout, 0); - final int retries = Integers.parseInt(agentRetries, 0); - final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); - final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY ); - - if (layout == null) { - layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, - null, null, null, null, excludes, includes, required, null, null, null, null); - } - - if (name == null) { - LOGGER.error("No name provided for Appender"); - return null; - } - - AbstractFlumeManager manager; - - switch (managerType) { - case EMBEDDED: - manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); - break; - case AVRO: - if (agents == null || agents.length == 0) { - LOGGER.debug("No agents provided, using defaults"); - agents = new Agent[] {Agent.createAgent(null, null)}; - } - manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); - break; - case PERSISTENT: - if (agents == null || agents.length == 0) { - LOGGER.debug("No agents provided, using defaults"); - agents = new Agent[] {Agent.createAgent(null, null)}; - } - manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, - connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir); - break; - default: - LOGGER.debug("No manager type specified. Defaulting to AVRO"); - if (agents == null || agents.length == 0) { - LOGGER.debug("No agents provided, using defaults"); - agents = new Agent[] {Agent.createAgent(null, null)}; - } - manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); - } - - if (manager == null) { - return null; - } - - return new FlumeAppender(name, filter, layout, ignoreExceptions, includes, - excludes, required, mdcPrefix, eventPrefix, compress, factory, manager); - } -} Index: log4j-flume-ng/pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- log4j-flume-ng/pom.xml (revision 6) +++ log4j-flume-ng/pom.xml (revision ) @@ -61,6 +61,12 @@ test + org.easymock + easymock + 3.1 + test + + org.apache.flume flume-ng-sdk