### Eclipse Workspace Patch 1.0 #P log4j-flume-ng Index: src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java =================================================================== --- src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (revision 1520406) +++ src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (working copy) @@ -40,6 +40,7 @@ private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; private static final int DEFAULT_MAX_DELAY = 60000; + private static final int DEFAULT_LOCK_TIMEOUT_RETRIES = 10; private final AbstractFlumeManager manager; @@ -149,6 +150,7 @@ * @param eventPrefix The prefix to add to event key names. * @param compressBody If true the event body will be compressed. * @param batchSize Number of events to include in a batch. Defaults to 1. + * @param lockTimeoutRetries The number of times to retry if LockConflictException occurs in Berkeley DB * @param factory The factory to use to create Flume events. * @param layout The layout to format the event. * @param filter A Filter to filter events. @@ -174,6 +176,7 @@ @PluginAttribute("eventPrefix") final String eventPrefix, @PluginAttribute("compress") final String compressBody, @PluginAttribute("batchSize") final String batchSize, + @PluginAttribute("lockTimeoutRetries") final String lockTORetries, @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, @PluginElement("Layout") Layout layout, @PluginElement("Filters") final Filter filter) { @@ -212,6 +215,7 @@ final int reqTimeout = Integers.parseInt(requestTimeout, 0); final int retries = Integers.parseInt(agentRetries, 0); final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY ); + final int lockTimeoutRetries = Integers.parseInt(lockTORetries, DEFAULT_LOCK_TIMEOUT_RETRIES ); if (layout == null) { layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, @@ -242,7 +246,7 @@ agents = new Agent[] {Agent.createAgent(null, null)}; } manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, - connectTimeout, reqTimeout, delay, dataDir); + connectTimeout, reqTimeout, delay, dataDir, lockTimeoutRetries); break; default: LOGGER.debug("No manager type specified. Defaulting to AVRO"); Index: src/test/resources/persistent.xml =================================================================== --- src/test/resources/persistent.xml (revision 1520147) +++ src/test/resources/persistent.xml (working copy) @@ -2,7 +2,7 @@ + batchsize="100" maxDelay="500" lockTimeoutRetries="20"> Index: src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java =================================================================== --- src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (revision 1520147) +++ src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (working copy) @@ -53,6 +53,7 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.StatsConfig; @@ -111,13 +112,15 @@ protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, final int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, final int delay, final Database database, - final Environment environment, final SecretKey secretKey) { + final Environment environment, final SecretKey secretKey, + final int lockTimeoutRetries) { super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); this.delay = delay; this.database = database; this.environment = environment; dbCount.set(database.count()); - this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount); + this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, + lockTimeoutRetries); this.worker.start(); this.secretKey = secretKey; this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); @@ -140,7 +143,7 @@ public static FlumePersistentManager getManager(final String name, final Agent[] agents, final Property[] properties, int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, - final int delay, final String dataDir) { + final int delay, final String dataDir, final int lockTimeoutRetries) { if (agents == null || agents.length == 0) { throw new IllegalArgumentException("At least one agent is required"); } @@ -162,7 +165,7 @@ sb.append("]"); sb.append(" ").append(dataDirectory); return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, - connectionTimeout, requestTimeout, delay, dataDir, properties)); + connectionTimeout, requestTimeout, delay, dataDir, properties, lockTimeoutRetries)); } @Override @@ -304,6 +307,7 @@ private final int requestTimeout; private final int delay; private final Property[] properties; + private final int lockTimeoutRetries; /** * Constructor. @@ -314,7 +318,7 @@ */ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, final int connectionTimeout, final int requestTimeout, final int delay, - final String dataDir, final Property[] properties) { + final String dataDir, final Property[] properties, final int lockTimeoutRetries) { this.name = name; this.agents = agents; this.batchSize = batchSize; @@ -324,6 +328,7 @@ this.requestTimeout = requestTimeout; this.delay = delay; this.properties = properties; + this.lockTimeoutRetries = lockTimeoutRetries; } } @@ -410,7 +415,8 @@ LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); } return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, - data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey); + data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, + data.lockTimeoutRetries); } } @@ -426,10 +432,11 @@ private final SecretKey secretKey; private final int batchSize; private final AtomicLong dbCounter; + private final int lockTimeoutRetries; public WriterThread(final Database database, final Environment environment, final FlumePersistentManager manager, final Gate gate, final int batchsize, - final SecretKey secretKey, final AtomicLong dbCount) { + final SecretKey secretKey, final AtomicLong dbCount, int lockTimeoutRetries) { this.database = database; this.environment = environment; this.manager = manager; @@ -438,6 +445,7 @@ this.secretKey = secretKey; this.setDaemon(true); this.dbCounter = dbCount; + this.lockTimeoutRetries = lockTimeoutRetries; } public void shutdown() { @@ -466,7 +474,7 @@ final DatabaseEntry data = new DatabaseEntry(); gate.close(); - OperationStatus status; + OperationStatus status = null; if (batchSize > 1) { try { errors = sendBatch(key, data); @@ -477,7 +485,24 @@ Transaction txn = environment.beginTransaction(null, null); Cursor cursor = database.openCursor(txn, null); try { - status = cursor.getFirst(key, data, LockMode.RMW); + int retryIndex = 0; + while (retryIndex < lockTimeoutRetries) { + try { + status = cursor.getFirst(key, data, LockMode.RMW); + break; + } catch (LockConflictException e) { + LOGGER.warn("Error reading database", e); + retryIndex++; + try { + Thread.sleep(500); + } catch (Exception e1) { + } + } + } + if (status == null) { + errors = true; + } + while (status == OperationStatus.SUCCESS) { final SimpleEvent event = createEvent(data); if (event != null) { @@ -494,7 +519,24 @@ LOGGER.error("Unable to delete event", ex); } } - status = cursor.getNext(key, data, LockMode.RMW); + status = null; + retryIndex = 0; + while (retryIndex < lockTimeoutRetries) { + try { + status = cursor.getNext(key, data, LockMode.RMW); + break; + } catch (LockConflictException e) { + LOGGER.warn("Error reading database", e); + retryIndex++; + try { + Thread.sleep(500); + } catch (Exception e1) { + } + } + } + if (status == null) { + errors = true; + } } if (cursor != null) { cursor.close(); @@ -553,10 +595,26 @@ private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception { boolean errors = false; - OperationStatus status; + OperationStatus status = null; Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT); try { - status = cursor.getFirst(key, data, null); + int retryIndex = 0; + while (retryIndex < lockTimeoutRetries) { + try { + status = cursor.getFirst(key, data, null); + break; + } catch (LockConflictException e) { + LOGGER.warn("Error reading database", e); + retryIndex++; + try { + Thread.sleep(500); + } catch (Exception e1) { + } + } + } + if (status == null) { + errors = true; + } final BatchEvent batch = new BatchEvent(); for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { @@ -564,7 +622,24 @@ if (event != null) { batch.addEvent(event); } - status = cursor.getNext(key, data, null); + status = null; + retryIndex = 0; + while (retryIndex < lockTimeoutRetries) { + try { + status = cursor.getNext(key, data, null); + break; + } catch (LockConflictException e) { + LOGGER.warn("Error reading database", e); + retryIndex++; + try { + Thread.sleep(500); + } catch (Exception e1) { + } + } + } + if (status == null) { + errors = true; + } } try { manager.send(batch); Index: src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java =================================================================== --- src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (revision 1520147) +++ src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (working copy) @@ -233,6 +233,178 @@ Assert.assertTrue("Channel contained event, but not expected message. Received: " + body, body.endsWith("This is a test message")); } + + @Test + public void testMultipleConcurrent() throws InterruptedException, + IOException { + + final int eventsCount = 10000; + + Thread writer1 = new Thread() { + public void run() { + for (int i = 0; i < (eventsCount / 4); ++i) { + final StructuredDataMessage msg = new StructuredDataMessage( + "Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + EventLogger.logEvent(msg); + } + } + }; + writer1.start(); + + Thread writer2 = new Thread() { + public void run() { + for (int i = (eventsCount / 4); i < (eventsCount / 2); ++i) { + final StructuredDataMessage msg = new StructuredDataMessage( + "Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + EventLogger.logEvent(msg); + } + } + }; + writer2.start(); + + Thread writer3 = new Thread() { + public void run() { + for (int i = (eventsCount / 2); i < (3 * eventsCount / 4); ++i) { + final StructuredDataMessage msg = new StructuredDataMessage( + "Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + EventLogger.logEvent(msg); + } + } + }; + writer3.start(); + + Thread writer4 = new Thread() { + public void run() { + for (int i = (3 * eventsCount / 4); i < eventsCount; ++i) { + final StructuredDataMessage msg = new StructuredDataMessage( + "Test", "Test Multiple " + i, "Test"); + msg.put("counter", Integer.toString(i)); + EventLogger.logEvent(msg); + } + } + }; + writer4.start(); + + final boolean[] fields = new boolean[eventsCount]; + Thread reader1 = new Thread() { + public void run() { + + for (int i = 0; i < eventsCount / 4; ++i) { + Event event = primary.poll(); + while (event == null) { + event = primary.poll(); + } + + Assert.assertNotNull("Received " + i + " events. Event " + + (i + 1) + " is null", event); + final String value = event.getHeaders().get("counter"); + Assert.assertNotNull("Missing counter", value); + final int counter = Integer.parseInt(value); + if (fields[counter]) { + Assert.fail("Duplicate event"); + } else { + fields[counter] = true; + } + + } + } + }; + reader1.start(); + + Thread reader2 = new Thread() { + public void run() { + + for (int i = eventsCount / 4; i < eventsCount / 2; ++i) { + Event event = primary.poll(); + while (event == null) { + event = primary.poll(); + } + + Assert.assertNotNull("Received " + i + " events. Event " + + (i + 1) + " is null", event); + final String value = event.getHeaders().get("counter"); + Assert.assertNotNull("Missing counter", value); + final int counter = Integer.parseInt(value); + if (fields[counter]) { + Assert.fail("Duplicate event"); + } else { + fields[counter] = true; + } + + } + } + }; + reader2.start(); + + Thread reader3 = new Thread() { + public void run() { + + for (int i = eventsCount / 2; i < (eventsCount * 3 / 4); ++i) { + Event event = primary.poll(); + while (event == null) { + event = primary.poll(); + } + + Assert.assertNotNull("Received " + i + " events. Event " + + (i + 1) + " is null", event); + final String value = event.getHeaders().get("counter"); + Assert.assertNotNull("Missing counter", value); + final int counter = Integer.parseInt(value); + if (fields[counter]) { + Assert.fail("Duplicate event"); + } else { + fields[counter] = true; + } + + } + } + }; + reader3.start(); + + Thread reader4 = new Thread() { + public void run() { + + for (int i = (eventsCount * 3 / 4); i < eventsCount; ++i) { + Event event = primary.poll(); + while (event == null) { + event = primary.poll(); + } + + Assert.assertNotNull("Received " + i + " events. Event " + + (i + 1) + " is null", event); + final String value = event.getHeaders().get("counter"); + Assert.assertNotNull("Missing counter", value); + final int counter = Integer.parseInt(value); + if (fields[counter]) { + Assert.fail("Duplicate event"); + } else { + fields[counter] = true; + } + + } + } + }; + reader4.start(); + + writer1.join(); + writer2.join(); + writer3.join(); + writer4.join(); + reader1.join(); + reader2.join(); + reader3.join(); + reader4.join(); + + for (int i = 0; i < eventsCount; ++i) { + Assert.assertTrue( + "Channel contained event, but not expected message " + i, + fields[i]); + } + } + /* @Test public void testPerformance() throws Exception { @@ -354,4 +526,4 @@ } return ports; } -} \ No newline at end of file +}