diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java index ccda263..95e13e6 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.core.config.Node; import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.jctools.queues.MpscArrayQueue; @@ -18,17 +19,20 @@ @Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE) public class JCToolsBlockingQueueFactory implements BlockingQueueFactory { - private JCToolsBlockingQueueFactory() { + + private final WaitStrategy waitStrategy; + private JCToolsBlockingQueueFactory(WaitStrategy waitStrategy) { + this.waitStrategy = waitStrategy; } @Override public BlockingQueue create(final int capacity) { - return new MpscBlockingQueue<>(capacity); + return new MpscBlockingQueue(capacity, waitStrategy); } @PluginFactory - public static JCToolsBlockingQueueFactory createFactory() { - return new JCToolsBlockingQueueFactory<>(); + public static JCToolsBlockingQueueFactory createFactory(@PluginAttribute(value = "WaitStrategy", defaultString = "PARK") WaitStrategy waitStrategy) { + return new JCToolsBlockingQueueFactory<>(waitStrategy); } /** @@ -36,11 +40,14 @@ */ private static final class MpscBlockingQueue extends MpscArrayQueue implements BlockingQueue { - MpscBlockingQueue(final int capacity) { - super(capacity); - } + private org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory.WaitStrategy waitStrategy; - @Override + public MpscBlockingQueue(final int capacity, final org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) { + super(capacity); + this.waitStrategy = waitStrategy; + } + + @Override public int drainTo(final Collection c) { return drainTo(c, capacity()); } @@ -49,29 +56,59 @@ public int drainTo(final Collection c, final int maxElements) { return drain(new Consumer() { @Override - public void accept(E arg0) { - c.add(arg0); + public void accept(E e) { + c.add(e); } }, maxElements); } @Override public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return offer(e); + int idleCounter = 0; + long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); + do { + if (offer(e)) { + return true; + } else if(System.nanoTime() - timeoutNanos > 0){ + return false; + } + idleCounter = waitStrategy.idle(idleCounter); + } while(!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); } @Override public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return poll(); + int idleCounter = 0; + long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); + do { + E result = poll(); + if (result != null) { + return result; + } else if(System.nanoTime() - timeoutNanos > 0){ + return null; + } + idleCounter = waitStrategy.idle(idleCounter); + } while(!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); } @Override public void put(final E e) throws InterruptedException { - while (!relaxedOffer(e)) { - LockSupport.parkNanos(1L); - } + int idleCounter = 0; + do { + if (offer(e)) { + return; + } + idleCounter = waitStrategy.idle(idleCounter); + } while(!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); + } + + @Override + public boolean offer(final E e){ + //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full. + return offerIfBelowThreshold(e, capacity() - 32); } @Override @@ -81,13 +118,66 @@ @Override public E take() throws InterruptedException { - for (; ; ) { - final E result = poll(); + int idleCounter = 100; + do { + final E result = relaxedPoll(); if (result != null) { return result; } - LockSupport.parkNanos(1L); - } + idleCounter = waitStrategy.idle(idleCounter); + } while(!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); } + } + + + public static enum WaitStrategy { + SPIN(new Idle(){ + @Override + public int idle(int idleCounter) { + return idleCounter + 1; + } + }), + YIELD(new Idle(){ + @Override + public int idle(int idleCounter) { + Thread.yield(); + return idleCounter + 1; + } + }), + PARK(new Idle(){ + @Override + public int idle(int idleCounter) { + LockSupport.parkNanos(1L); + return idleCounter + 1; + } + }), + PROGRESSIVE(new Idle(){ + @Override + public int idle(int idleCounter) { + if(idleCounter > 200){ + LockSupport.parkNanos(1L); + } else if(idleCounter > 100){ + Thread.yield(); + } + return idleCounter + 1; + } + }); + + private final Idle idle; + private int idle(int idleCounter){ + return idle.idle(idleCounter); + } + + private WaitStrategy(Idle idle){ + this.idle = idle; + } + } + + private static interface Idle { + int idle(int idleCounter); + } + } + diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java new file mode 100644 index 0000000..9194847 --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.perftest.ResponseTimeTest.PrintingAsyncQueueFullPolicy; + +import com.lmax.disruptor.collections.Histogram; + +public abstract class AbstractRunQueue implements IPerfTestRunner { + + abstract BlockingQueue createQueue(int capacity); + + private static final String STOP = "STOP_TEST"; + private volatile boolean stopped = false; + private final BlockingQueue queue = createQueue(256 * 1024); + private final Thread backGroundThread; + + + AbstractRunQueue(){ + backGroundThread = new Thread(new Runnable() { + @Override + public void run() { + for(;;){ + try { + if(queue.take() == STOP) break; + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + }); + backGroundThread.start(); + } + + @Override + public void runThroughputTest(final int lines, final Histogram histogram) { + } + + + @Override + public void runLatencyTest(final int samples, final Histogram histogram, + final long nanoTimeCost, final int threadCount) { + } + + + @Override + public final void shutdown() { + stopped = true; + try { + queue.put(STOP); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + @Override + public final void log(final String finalMessage) { + if(stopped) return; + if(!queue.offer(finalMessage)){ + PrintingAsyncQueueFullPolicy.ringbufferFull.incrementAndGet(); + try { + queue.put(finalMessage); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } +} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java index c09c428..93f0f2e 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java @@ -156,6 +156,7 @@ final int COUNT = (1000 * 1000) / threadCount; runLatencyTest(logger, TEST_DURATION_MILLIS, COUNT, loadMessagesPerSec, idleStrategy, serviceTmHistograms, responseTmHistograms, threadCount); + logger.shutdown(); final long end = System.currentTimeMillis(); // ... and report the results diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java new file mode 100644 index 0000000..7b6d0bc --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.DisruptorBlockingQueueFactory; + +import com.conversantmedia.util.concurrent.SpinPolicy; + +public class RunConversant extends AbstractRunQueue { + + @Override + BlockingQueue createQueue(int capacity) { + return DisruptorBlockingQueueFactory.createFactory(SpinPolicy.SPINNING).create(capacity); + } +} diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java new file mode 100644 index 0000000..602c4e6 --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory; +import org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory.WaitStrategy; + +public class RunJCTools extends AbstractRunQueue { + + @Override + BlockingQueue createQueue(int capacity) { + return JCToolsBlockingQueueFactory.createFactory(WaitStrategy.SPIN).create(capacity); + } + + + + +} diff --git a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml index 26cd946..372c8bd 100644 --- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml +++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml @@ -21,7 +21,7 @@ - + diff --git a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml index 3bf3c98..c2f6de0 100644 --- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml +++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml @@ -21,7 +21,7 @@ - +