diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml index 1c00fbc..f8193e8 100644 --- a/log4j-core/pom.xml +++ b/log4j-core/pom.xml @@ -114,6 +114,12 @@ kafka-clients true + + + org.zeromq + jeromq + true + org.apache.commons diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java new file mode 100644 index 0000000..90ed8f5 --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.status.StatusLogger; +import org.apache.logging.log4j.util.PropertiesUtil; +import org.apache.logging.log4j.util.Strings; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; + +/** + * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. + *

+ * Requires the JeroMQ jar (LGPL as of 0.3.5) + *

+ */ +// TODO +// Some methods are synchronized because a ZMQ.Socket is not thread-safe +// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be +// some issue on threads owning certain resources as opposed to others. +@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true) +public final class JeroMqAppender extends AbstractAppender { + + // Per ZMQ docs, there should usually only be one ZMQ context per process. + private static volatile ZMQ.Context context; + + private static Logger logger; + + // ZMQ sockets are not thread safe. + private static ZMQ.Socket publisher; + + private static final long serialVersionUID = 1L; + + private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName(); + + static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; + + static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; + + static { + logger = StatusLogger.getLogger(); + final PropertiesUtil managerProps = PropertiesUtil.getProperties(); + final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); + final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); + final String simpleName = SIMPLE_NAME; + logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString()); + logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads); + context = ZMQ.context(ioThreads); + logger.trace("{} created ZMQ context {}", simpleName, context); + if (enableShutdownHook) { + final Thread hook = new Thread(simpleName + "-shutdown") { + @Override + public void run() { + shutdown(); + } + }; + logger.trace("{} adding shutdown hook {}", simpleName, hook); + Runtime.getRuntime().addShutdownHook(hook); + } + } + + // The ZMQ.Socket class has other set methods that we do not cover because + // they throw unsupported operation exceptions. + @PluginFactory + public static JeroMqAppender createAppender( + // @formatter:off + @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, + @PluginElement("Layout") Layout layout, + @PluginElement("Filters") final Filter filter, + @PluginElement("Properties") final Property[] properties, + // Super attributes + @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, + // ZMQ attributes; defaults picked from zmq.Options. + @PluginAttribute(value="affinity", defaultLong=0) final long affinity, + @PluginAttribute(value="backlog", defaultLong=100) final long backlog, + @PluginAttribute(value="delayAttachOnConnect", defaultBoolean=false) final boolean delayAttachOnConnect, + @PluginAttribute(value="identity") final byte[] identity, + @PluginAttribute(value="ipv4Only", defaultBoolean=true) final boolean ipv4Only, + @PluginAttribute(value="linger", defaultLong=-1) final long linger, + @PluginAttribute(value="maxMsgSize", defaultLong=-1) final long maxMsgSize, + @PluginAttribute(value="rcvHwm", defaultLong=1000) final long rcvHwm, + @PluginAttribute(value="receiveBufferSize", defaultLong=0) final long receiveBufferSize, + @PluginAttribute(value="receiveTimeOut", defaultLong=-1) final int receiveTimeOut, + @PluginAttribute(value="reconnectIVL", defaultLong=100) final long reconnectIVL, + @PluginAttribute(value="reconnectIVLMax", defaultLong=0) final long reconnectIVLMax, + @PluginAttribute(value="sendBufferSize", defaultLong=0) final long sendBufferSize, + @PluginAttribute(value="sendTimeOut", defaultLong=-1) final int sendTimeOut, + @PluginAttribute(value="sndHwm", defaultLong=1000) final long sndHwm, + @PluginAttribute(value="tcpKeepAlive", defaultInt=-1) final int tcpKeepAlive, + @PluginAttribute(value="tcpKeepAliveCount", defaultLong=-1) final long tcpKeepAliveCount, + @PluginAttribute(value="tcpKeepAliveIdle", defaultLong=-1) final long tcpKeepAliveIdle, + @PluginAttribute(value="tcpKeepAliveInterval", defaultLong=-1) final long tcpKeepAliveInterval, + @PluginAttribute(value="xpubVerbose", defaultBoolean=false) final boolean xpubVerbose + // @formatter:on + ) { + if (layout == null) { + layout = PatternLayout.createDefaultLayout(); + } + List endpoints; + if (properties == null) { + endpoints = new ArrayList<>(0); + } else { + endpoints = new ArrayList<>(properties.length); + for (final Property property : properties) { + if ("endpoint".equalsIgnoreCase(property.getName())) { + final String value = property.getValue(); + if (Strings.isNotEmpty(value)) { + endpoints.add(value); + } + } + } + } + logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", + name, filter, layout, ignoreExceptions, endpoints); + return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, + delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, + reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount, + tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); + } + + static ZMQ.Context getContext() { + return context; + } + + private static ZMQ.Socket getPublisher() { + return publisher; + } + + private static ZMQ.Socket newPublisher() { + logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context); + final Socket socketPub = context.socket(ZMQ.PUB); + logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub); + return socketPub; + } + + static void shutdown() { + if (context != null) { + logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context); + context.term(); + context = null; + } + } + + private final long affinity; + private final long backlog; + private final boolean delayAttachOnConnect; + private final List endpoints; + private final byte[] identity; + private final int ioThreads = 1; + private final boolean ipv4Only; + private final long linger; + private final long maxMsgSize; + private final long rcvHwm; + private final long receiveBufferSize; + private final int receiveTimeOut; + private final long reconnectIVL; + private final long reconnectIVLMax; + private final long sendBufferSize; + private int sendRcFalse; + private int sendRcTrue; + private final int sendTimeOut; + private final long sndHwm; + private final int tcpKeepAlive; + private final long tcpKeepAliveCount; + private final long tcpKeepAliveIdle; + private final long tcpKeepAliveInterval; + private final boolean xpubVerbose; + + private JeroMqAppender(final String name, final Filter filter, final Layout layout, + final boolean ignoreExceptions, final List endpoints, final long affinity, final long backlog, + final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, + final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, + final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, + final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose) { + super(name, filter, layout, ignoreExceptions); + this.endpoints = endpoints; + this.affinity = affinity; + this.backlog = backlog; + this.delayAttachOnConnect = delayAttachOnConnect; + this.identity = identity; + this.ipv4Only = ipv4Only; + this.linger = linger; + this.maxMsgSize = maxMsgSize; + this.rcvHwm = rcvHwm; + this.receiveBufferSize = receiveBufferSize; + this.receiveTimeOut = receiveTimeOut; + this.reconnectIVL = reconnectIVL; + this.reconnectIVLMax = reconnectIVLMax; + this.sendBufferSize = sendBufferSize; + this.sendTimeOut = sendTimeOut; + this.sndHwm = sndHWM; + this.tcpKeepAlive = tcpKeepAlive; + this.tcpKeepAliveCount = tcpKeepAliveCount; + this.tcpKeepAliveIdle = tcpKeepAliveIdle; + this.tcpKeepAliveInterval = tcpKeepAliveInterval; + this.xpubVerbose = xpubVerbose; + } + + @Override + public synchronized void append(final LogEvent event) { + final String formattedMessage = event.getMessage().getFormattedMessage(); + if (getPublisher().send(formattedMessage, 0)) { + sendRcTrue++; + } else { + sendRcFalse++; + logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, + formattedMessage); + } + } + + // not public, handy for testing + int getSendRcFalse() { + return sendRcFalse; + } + + // not public, handy for testing + int getSendRcTrue() { + return sendRcTrue; + } + + // not public, handy for testing + void resetSendRcs() { + sendRcTrue = sendRcFalse = 0; + } + + @Override + public synchronized void start() { + super.start(); + publisher = newPublisher(); + final String name = getName(); + final String prefix = "JeroMQ Appender"; + logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString()); + logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads); + // + final ZMQ.Socket socketPub = getPublisher(); + logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, + socketPub.getClass().getName(), socketPub); + logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity); + socketPub.setAffinity(affinity); + logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog); + socketPub.setBacklog(backlog); + logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect); + socketPub.setDelayAttachOnConnect(delayAttachOnConnect); + if (identity != null) { + logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity)); + socketPub.setIdentity(identity); + } + logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only); + socketPub.setIPv4Only(ipv4Only); + logger.trace("{} {} publisher setLinger({})", prefix, name, linger); + socketPub.setLinger(linger); + logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize); + socketPub.setMaxMsgSize(maxMsgSize); + logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm); + socketPub.setRcvHWM(rcvHwm); + logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize); + socketPub.setReceiveBufferSize(receiveBufferSize); + logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut); + socketPub.setReceiveTimeOut(receiveTimeOut); + logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL); + socketPub.setReconnectIVL(reconnectIVL); + logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax); + socketPub.setReconnectIVLMax(reconnectIVLMax); + logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize); + socketPub.setSendBufferSize(sendBufferSize); + logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut); + socketPub.setSendTimeOut(sendTimeOut); + logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm); + socketPub.setSndHWM(sndHwm); + logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive); + socketPub.setTCPKeepAlive(tcpKeepAlive); + logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount); + socketPub.setTCPKeepAliveCount(tcpKeepAliveCount); + logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle); + socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle); + logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval); + socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval); + logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose); + socketPub.setXpubVerbose(xpubVerbose); + // + if (logger.isDebugEnabled()) { + logger.debug( + "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, " + + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, " + + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}", + name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(), + socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(), + socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(), + socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(), + socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(), + socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(), + socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(), + socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting()); + } + for (final String endpoint : endpoints) { + logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint); + socketPub.bind(endpoint); + } + } + + @Override + public synchronized void stop() { + super.stop(); + final Socket socketPub = getPublisher(); + if (socketPub != null) { + logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub); + socketPub.close(); + publisher = null; + } + } + + @Override + public String toString() { + return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]"; + } + +} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java new file mode 100644 index 0000000..041419c --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.junit.LoggerContextRule; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +public class JeroMqAppenderTest { + + @AfterClass + public static void tearDownClass() { + // JeroMqAppender.shutdown(); + } + + @ClassRule + public static LoggerContextRule ctx = new LoggerContextRule("JeroMqAppenderTest.xml"); + + @Test(timeout = 10000) + public void testAppenderLifeCycle() throws Exception { + // do nothing to make sure the appender starts and stops without + // locking up resources. + Assert.assertNotNull(JeroMqAppender.getContext()); + } + + @Test(timeout = 10000) + public void testClientServer() throws Exception { + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + final int expectedReceiveCount = 2; + final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", expectedReceiveCount); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future> future = executor.submit(client); + Thread.sleep(100); + final Logger logger = ctx.getLogger(getClass().getName()); + appender.resetSendRcs(); + logger.info("Hello"); + logger.info("Again"); + final List list = future.get(); + Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + Assert.assertEquals("Hello", list.get(0)); + Assert.assertEquals("Again", list.get(1)); + } finally { + executor.shutdown(); + } + } + + @Test(timeout = 10000) + public void testMultiThreadedServer() throws Exception { + final int nThreads = 10; + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + final int expectedReceiveCount = 2 * nThreads; + final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", + expectedReceiveCount); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future> future = executor.submit(client); + Thread.sleep(100); + final Logger logger = ctx.getLogger(getClass().getName()); + appender.resetSendRcs(); + final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads); + for (int i = 0; i < 10.; i++) { + fixedThreadPool.submit(new Runnable() { + @Override + public void run() { + logger.info("Hello"); + logger.info("Again"); + } + }); + } + final List list = future.get(); + Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + int hello = 0; + int again = 0; + for (final String string : list) { + switch (string) { + case "Hello": + hello++; + break; + case "Again": + again++; + break; + default: + Assert.fail("Unexpected message: " + string); + } + } + Assert.assertEquals(nThreads, hello); + Assert.assertEquals(nThreads, again); + } finally { + executor.shutdown(); + } + } + + @Test(timeout = 10000) + public void testServerOnly() throws Exception { + final Logger logger = ctx.getLogger(getClass().getName()); + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + appender.resetSendRcs(); + logger.info("Hello"); + logger.info("Again"); + Assert.assertEquals(2, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + } +} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java new file mode 100644 index 0000000..ddd06ab --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.zeromq.ZMQ; + +class JeroMqTestClient implements Callable> { + + private final ZMQ.Context context; + + private final String endpoint; + private final List messages; + private final int receiveCount; + + JeroMqTestClient(final ZMQ.Context context, final String endpoint, final int receiveCount) { + super(); + this.context = context; + this.endpoint = endpoint; + this.receiveCount = receiveCount; + this.messages = new ArrayList<>(receiveCount); + } + + @Override + public List call() throws Exception { + try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)) { + subscriber.connect(endpoint); + subscriber.subscribe(new byte[0]); + for (int messageNum = 0; messageNum < receiveCount + && !Thread.currentThread().isInterrupted(); messageNum++) { + // Use trim to remove the tailing '0' character + messages.add(subscriber.recvStr(0).trim()); + } + } + return messages; + } +} \ No newline at end of file diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java index 65836d6..006d70c 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java @@ -117,6 +117,17 @@ } /** + * Gets a named Appender for this LoggerContext. + * @param The target Appender class + * @param name the name of the Appender to look up. + * @param cls The target Appender class + * @return the named Appender or {@code null} if it wasn't defined in the configuration. + */ + public T getAppender(final String name, Class cls) { + return cls.cast(getConfiguration().getAppenders().get(name)); + } + + /** * Gets a named Appender or throws an exception for this LoggerContext. * @param name the name of the Appender to look up. * @return the named Appender. @@ -129,6 +140,20 @@ } /** + * Gets a named Appender or throws an exception for this LoggerContext. + * @param The target Appender class + * @param name the name of the Appender to look up. + * @param cls The target Appender class + * @return the named Appender. + * @throws AssertionError if the Appender doesn't exist. + */ + public T getRequiredAppender(final String name, Class cls) { + final T appender = getAppender(name, cls); + assertNotNull("Appender named " + name + " was null.", appender); + return appender; + } + + /** * Gets a named ListAppender or throws an exception for this LoggerContext. * @param name the name of the ListAppender to look up. * @return the named ListAppender. diff --git a/log4j-core/src/test/resources/JeroMqAppenderTest.xml b/log4j-core/src/test/resources/JeroMqAppenderTest.xml new file mode 100644 index 0000000..119fc00 --- /dev/null +++ b/log4j-core/src/test/resources/JeroMqAppenderTest.xml @@ -0,0 +1,30 @@ + + + + + + tcp://*:5556 + ipc://info-topic + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 019b089..fbbb657 100644 --- a/pom.xml +++ b/pom.xml @@ -565,6 +565,11 @@ 0.8.2.1
+ org.zeromq + jeromq + 0.3.5 + + javax.servlet servlet-api 2.5 diff --git a/src/site/site.xml b/src/site/site.xml index 3a2940d..7c82498 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -131,6 +131,7 @@ + diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml index a99ad88..85fe3ad 100644 --- a/src/site/xdoc/manual/appenders.xml +++ b/src/site/xdoc/manual/appenders.xml @@ -3288,6 +3288,167 @@ ]]> + + +

+ The ZeroMQ appender uses the JeroMQ library to send log + events to one or more endpoints. +

+

+ This is a simple JeroMQ configuration: +

+

+
+  
+        
+      tcp://*:5556
+      ipc://info-topic
+    
+  
+  
+    
+      
+    
+  
+]]>
+

+ The table below describes all options. Please consult the JeroMQ and ZeroMQ documentation for details. +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
JeroMQ Parameters
Parameter NameTypeDescription
nameStringThe name of the Appender.
LayoutLayoutThe Layout of the Appender.
FiltersFilterThe Filters of the Appender.
PropertyPropertyOne or more Property elements, named endpoint.
ignoreExceptionsbooleanIf true, exceptions will be logged and suppressed. If false errors will be logged and then passed to the application.
affinitylongThe ZMQ_AFFINITY option. Defaults to 0.
backloglongThe ZMQ_BACKLOG option. Defaults to 100.
delayAttachOnConnectbooleanThe ZMQ_DELAY_ATTACH_ON_CONNECT option. Defaults to false.
identitybyte[]The ZMQ_IDENTITY option. Defaults to none.
ipv4OnlybooleanThe ZMQ_IPV4ONLY option. Defaults to true.
lingerlongThe ZMQ_LINGER option. Defaults to -1.
maxMsgSizelongThe ZMQ_MAXMSGSIZE option. Defaults to -1.
rcvHwmlongThe ZMQ_RCVHWM option. Defaults to 1000.
receiveBufferSizelongThe ZMQ_RCVBUF option. Defaults to 0.
receiveTimeOutintThe ZMQ_RCVTIMEO option. Defaults to -1.
reconnectIVLlongThe ZMQ_RECONNECT_IVL option. Defaults to 100.
reconnectIVLMaxlongThe ZMQ_RECONNECT_IVL_MAX option. Defaults to 0.
sendBufferSizelongThe ZMQ_SNDBUF option. Defaults to 0.
sendTimeOutintThe ZMQ_SNDTIMEO option. Defaults to -1.
sndHwmlongThe ZMQ_SNDHWM option. Defaults to 1000.
tcpKeepAliveintThe ZMQ_TCP_KEEPALIVE option. Defaults to -1.
tcpKeepAliveCountlongThe ZMQ_TCP_KEEPALIVE_CNT option. Defaults to -1.
tcpKeepAliveIdlelongThe ZMQ_TCP_KEEPALIVE_IDLE option. Defaults to -1.
tcpKeepAliveIntervallongThe ZMQ_TCP_KEEPALIVE_INTVL option. Defaults to -1.
xpubVerbosebooleanThe ZMQ_XPUB_VERBOSE option. Defaults to false.
+ + diff --git a/src/site/xdoc/runtime-dependencies.xml b/src/site/xdoc/runtime-dependencies.xml index bbee62a..ace4141 100644 --- a/src/site/xdoc/runtime-dependencies.xml +++ b/src/site/xdoc/runtime-dependencies.xml @@ -100,6 +100,14 @@ In addition, XZ requires XZ for Java. + + ZeroMQ Appender + + The ZeroMQ appender uses the JeroMQ library which is + licensed under the terms of the GNU Lesser General Public License (LGPL). For details see the + files COPYING and COPYING.LESSER included with the JeroMQ distribution. + +