Index: src/Appender/AsyncAppender.cs =================================================================== --- src/Appender/AsyncAppender.cs (revision 0) +++ src/Appender/AsyncAppender.cs (revision 0) @@ -0,0 +1,408 @@ +using System; +using System.Threading; +using log4net.Core; + +namespace log4net.Appender +{ + /// + /// A forwarding appender that queues the events first. And then forward + /// to the attached appenders asynchronously. + /// + public class AsyncAppender : ForwardingAppender + { + /// + /// The default event queue capacity. + /// + /// + public const int DEFAULT_QUEUE_CAPACITY = 8192; //64K buffer + + /// + /// The default event batch size + /// + /// + public const int DEFAULT_BATCH_SIZE = 256; + + /// + /// The default batch wait time. + /// + /// + public static readonly TimeSpan DEFAULT_BATCH_WAIT_TIMEOUT = TimeSpan.FromSeconds(5); + + /// + /// The minimal accaptable queue capacity. + /// + public const int MIN_QUEUE_CAPACITY = 32; + + /// + /// The minimal allowed queue capacity to batch size ratio. + /// + public const int MIN_CAPACITY_TO_BATCH_RATIO = 4; + + private FixFlags _fixFlags = FixFlags.All; + + private int _queueCapacity = DEFAULT_QUEUE_CAPACITY; + + private readonly object _queueLock = new object(); + private LoggingEvent[] _queue; + private int _head = 0, _tail = 0; + private int _size = 0; + private TimeSpan _batchWait = DEFAULT_BATCH_WAIT_TIMEOUT; + private int _batchSize = DEFAULT_BATCH_SIZE; + private bool _isShutDown = false; + + private Thread _asyncThread; + + #region Public Properties + + /// + /// Gets or sets a the fields that will be fixed in the event + /// + /// + /// The event fields that will be fixed before the event is queued. + /// + /// + /// + /// The logging event needs to have certain thread specific values + /// captured before it can be queue for async logging. See + /// for details. + /// + /// + /// + virtual public FixFlags Fix + { + get { return _fixFlags; } + set { _fixFlags = value; } + } + + private int _tempBatchSize = DEFAULT_BATCH_SIZE; + /// + /// Gets and sets the number of events to forward to the attached + /// appenders in batch. + /// + public int ForwardingBatchSize + { + get { return _tempBatchSize; } + set { _tempBatchSize = value; } + } + + /// + /// Gets and sets the seconds to wait for the events to fill the batch + /// size. will strat to forward to the + /// attached appenders after the given time lapse regardless if the + /// number of events has reached the required batch size . + /// + public double BatchWaitTimeout + { + get { return _batchWait.TotalSeconds; } + set { _batchWait = TimeSpan.FromSeconds(value); } + } + + private int _tempQueueCapacity = DEFAULT_QUEUE_CAPACITY; + /// + /// Gets and sets the event queue capacity. Once the this limit + /// is reached. Events will be discarded. + /// + public int QueueCapacity + { + get { return _tempQueueCapacity; } + set { _tempQueueCapacity = value; } + } + + #endregion + + #region Method Overrides + + /// + /// + /// Initialize the appender based on the options set + /// + /// + /// + /// + /// + /// + /// + /// This is part of the delayed object + /// activation scheme. The method must + /// be called on this object after the configuration properties have + /// been set. Until is called this + /// object is in an undefined state and must not be used. + /// + /// + /// + /// + /// + /// If any of the configuration properties are modified then + /// must be called again. + /// + /// + /// + /// + /// + public override void ActivateOptions() + { + lock (this) + { + base.ActivateOptions(); + lock (_queueLock) + try + { + if (_tempQueueCapacity < MIN_QUEUE_CAPACITY) + { + _tempQueueCapacity = MIN_QUEUE_CAPACITY; + } + + // This is important to avoid unnecessary pulse in Append + if (_tempBatchSize <= 1) _tempBatchSize = 0; + + if (_tempBatchSize > 1 && (_queueCapacity/_tempBatchSize) < MIN_CAPACITY_TO_BATCH_RATIO) + { + _tempBatchSize = _queueCapacity/MIN_CAPACITY_TO_BATCH_RATIO; + } + + if (_queue == null) + { + _queue = new LoggingEvent[_tempQueueCapacity]; + } + else if (_tempQueueCapacity != _queueCapacity) + { + while (_tempQueueCapacity < _size) Monitor.Wait(_queueLock); + + LoggingEvent[] newQueue = new LoggingEvent[_tempQueueCapacity]; + for (int i = 0; i < _size; i++) + { + newQueue[i] = _queue[(_head + i)%_queueCapacity]; + } + _queue = newQueue; + _head = 0; + _tail = _size % _tempQueueCapacity; + } + _queueCapacity = _tempQueueCapacity; + _batchSize = _tempBatchSize; + } + finally + { + Monitor.Pulse(_queueLock); + } + + if (_asyncThread == null && !_isShutDown) + { + _asyncThread = new Thread(AsyncAppend); + _asyncThread.IsBackground = true; + _asyncThread.Name = typeof (AsyncAppender).FullName; + _asyncThread.Start(); + } + } + } + + /// + /// Always return true; + /// + /// Always return true. + protected override bool PreAppendCheck() + { + return true; + } + + /// + /// Add the to the queue. If queue is + /// full, error is reported and it waits for the queue to be available. + /// + /// Event to log. + protected override void Append(LoggingEvent loggingEvent) + { + if (_fixFlags != FixFlags.None) loggingEvent.Fix = _fixFlags; + + lock (_queueLock) + { + if (_size >= _queueCapacity) + { + ErrorHandler.Error("AsyncAppender: Queue is full."); + Monitor.Wait(_queueLock); + } + if (_size == 0) Monitor.Pulse(_queueLock); + Enqueue(loggingEvent); + if (_size >= _batchSize) Monitor.Pulse(_queueLock); + } + } + + /// + /// Add all the events in to the + /// queue. If queue doesn't have enough free capacity to hold all + /// those events, error is reported and it waits for the queue to + /// be available. + /// + /// Evnets to log. + protected override void Append(LoggingEvent[] loggingEvents) + { + if (_fixFlags != FixFlags.None) + { + foreach (LoggingEvent loggingEvent in loggingEvents) + { + loggingEvent.Fix = _fixFlags; + } + } + lock (_queueLock) + { + if (_size + loggingEvents.Length > _queueCapacity) + { + ErrorHandler.Error("AsyncAppender: Queue is full."); + Monitor.Wait(_queueLock); + } + // Must pulse before we add to queue so wakeup is guaranteed + // and only when queue is empty to avoid unnecessary wakeup. + if (_size == 0) Monitor.Pulse(_queueLock); + try + { + foreach (LoggingEvent loggingEvent in loggingEvents) + { + Enqueue(loggingEvent); + } + } + finally + { + if (_size >= _batchSize) Monitor.Pulse(_queueLock); + } + } + } + + /// + /// Inform and wait for the asynchronous thread to end. + /// + protected override void OnClose() + { + Thread worker; + + // Tell async thread we are shutting down + lock(_queueLock) + { + _isShutDown = true; + Monitor.Pulse(_queueLock); + worker = _asyncThread; + } + + if (worker != null) + { + // Wait for the async thread to exit. + Monitor.Wait(this); + worker.Join(); + } + } + + #endregion + + #region Private Methods + + private void AsyncAppend() + { + // this is to handle the case when the batch size changes after + // worker thread started. + while (_batchSize > 1 ? AsyncAppendBatch(_batchSize) : AsyncAppendSingle()) + { + } + + // Async thread exiting, let's close all attached appenders. + lock (this) + { + base.OnClose(); + Monitor.Pulse(this); + } + } + + private bool AsyncAppendSingle() + { + // start the looping to wait for the events in the queue and send + // then to the attached appenders when available. + for (; ; ) + { + LoggingEvent loggingEvent; + lock (_queueLock) + { + if (_batchSize > 1) return true; + while (_size == 0) + { + if (_isShutDown) return false; + // Nothing to log, let's wait for it + try { Monitor.Wait(_queueLock); } + catch(Exception e) + { + ErrorHandler.Error("AsyncAppender: async thread exception " + e.Message, e); + } + } + loggingEvent = Dequeue(); + + // Wake up one waiting appender if any. + Monitor.Pulse(_queueLock); + } + base.Append(loggingEvent); + } + } + + private bool AsyncAppendBatch(int count) + { + for (; ; ) + { + LoggingEvent[] loggingEvents; + lock (_queueLock) + { + if (_batchSize <= 1) return true; + while (_size == 0) + { + if (_isShutDown) return false; + // Nothing to log, let's wait for it + try { Monitor.Wait(_queueLock); } + catch (Exception e) + { + ErrorHandler.Error("AsyncAppender: async thread exception " + e.Message, e); + } + } + DateTime now = DateTime.Now; + DateTime releaseTime = now + _batchWait; + while (!_isShutDown && _size < count && releaseTime > now) + { + // Let's wait for the queue to be filled with required + // batch size. But not later then releaseTime. + try { Monitor.Wait(_queueLock, releaseTime - now); } + catch (Exception e) + { + ErrorHandler.Error("AsyncAppender: async thread exception " + e.Message, e); + } + + now = DateTime.Now; + } + + // wake up all appending thread so they can try again as + // soon as we release the lock. + Monitor.PulseAll(_queueLock); + + int batch = _size < count ? _size : count; + loggingEvents = new LoggingEvent[batch]; + for (int i = 0; i < batch; i++) + { + loggingEvents[i] = Dequeue(); + } + } + base.Append(loggingEvents); + } + } + + private LoggingEvent Dequeue() + { + LoggingEvent loggingEvent = _queue[_head]; + _queue[_head++] = null; + if (_head == _queueCapacity) _head = 0; + _size--; + return loggingEvent; + } + + private void Enqueue(LoggingEvent loggingEvent) + { + _queue[_tail++] = loggingEvent; + if (_tail == _queueCapacity) _tail = 0; + _size++; + } + + #endregion + + } +} Index: src/Appender/BufferingAppenderSkeleton.cs =================================================================== --- src/Appender/BufferingAppenderSkeleton.cs (revision 727418) +++ src/Appender/BufferingAppenderSkeleton.cs (working copy) @@ -516,6 +516,39 @@ } } + /// + /// This method is called by the method. + /// + ///the array of logging events + /// + /// + /// When buffering is disabled and not in lossy mode, the bulk array of events + /// are forwarded directly to for better + /// performance. Otherwise, delegate the call to base class. + /// + /// + /// + protected override void Append(LoggingEvent[] loggingEvents) + { + if (m_bufferSize > 1 || m_lossy) + { + base.Append(loggingEvents); + } + else + { + if (m_eventMustBeFixed) + { + // Derive class expects fixed events + foreach (LoggingEvent loggingEvent in loggingEvents) + { + loggingEvent.Fix = this.Fix; + } + } + + SendBuffer(loggingEvents); + } + } + #endregion Override implementation of AppenderSkeleton #region Protected Instance Methods Index: src/log4net.vs2005.csproj =================================================================== --- src/log4net.vs2005.csproj (revision 727418) +++ src/log4net.vs2005.csproj (working copy) @@ -95,6 +95,7 @@ Code + Code Index: tests/src/Appender/AsyncAppenderTest.cs =================================================================== --- tests/src/Appender/AsyncAppenderTest.cs (revision 0) +++ tests/src/Appender/AsyncAppenderTest.cs (revision 0) @@ -0,0 +1,416 @@ +using System; +using System.Threading; +using log4net.Appender; +using log4net.Core; +using NUnit.Framework; + +namespace log4net.Tests.Appender +{ + /// + /// Test cases for + /// + [TestFixture] + public class AsyncAppenderTest + { + private AsyncAppender _asyncAppender; + private MockAppender _mockAppender; + private volatile bool _isReleased; + + [SetUp] public void Setup() + { + _mockAppender = new MockAppender(); + _asyncAppender = new AsyncAppender(); + _asyncAppender.AddAppender(_mockAppender); + } + + [TearDown] public void TearDown() + { + _asyncAppender.Close(); + } + + [Test] public void FixProperty() + { + Assert.AreEqual(FixFlags.All, _asyncAppender.Fix); + FixFlags fix = FixFlags.Partial; + _asyncAppender.Fix = fix; + Assert.AreEqual(fix, _asyncAppender.Fix); + } + + [Test] public void SingleEventMustBeFixed() + { + LoggingEvent e = new LoggingEvent(null, null, new LoggingEventData(), FixFlags.None); + Assert.AreEqual(FixFlags.None, e.Fix); + + _asyncAppender.Fix = FixFlags.Partial; + _asyncAppender.ActivateOptions(); + _asyncAppender.DoAppend(e); + _asyncAppender.Close(); + Assert.AreEqual(FixFlags.Partial, e.Fix); + } + + [Test] + public void MultiEventMustBeFixed() + { + FixFlags fix = FixFlags.None; + LoggingEvent[] events = + new LoggingEvent[] + { + new LoggingEvent(null, null, new LoggingEventData(), FixFlags.None), + new LoggingEvent(null, null, new LoggingEventData(), FixFlags.None), + }; + _asyncAppender.Fix = FixFlags.Partial; + _asyncAppender.ActivateOptions(); + _asyncAppender.DoAppend(events); + _asyncAppender.Close(); + Assert.AreEqual(FixFlags.Partial, events[0].Fix); + Assert.AreEqual(FixFlags.Partial, events[1].Fix); + } + + [Test] public void QueueCapacityProperty() + { + int capacity = AsyncAppender.MIN_QUEUE_CAPACITY*2; + _asyncAppender.QueueCapacity = capacity; + Assert.AreEqual(capacity, _asyncAppender.QueueCapacity); + _asyncAppender.ActivateOptions(); + Assert.AreEqual(capacity, _asyncAppender.QueueCapacity); + } + + [Test] public void QueueCapacityNotLowerThenMinimal() + { + int capacity = AsyncAppender.MIN_QUEUE_CAPACITY - 1; + _asyncAppender.QueueCapacity = capacity; + Assert.AreEqual(capacity, _asyncAppender.QueueCapacity); + _asyncAppender.ActivateOptions(); + Assert.AreEqual(AsyncAppender.MIN_QUEUE_CAPACITY, _asyncAppender.QueueCapacity); + } + + [Test] public void ForwardingBatchSizeProperty() + { + int size = AsyncAppender.DEFAULT_BATCH_SIZE - 1; + _asyncAppender.ForwardingBatchSize = size; + Assert.AreEqual(size, _asyncAppender.ForwardingBatchSize); + _asyncAppender.ActivateOptions(); + Assert.AreEqual(size, _asyncAppender.ForwardingBatchSize); + } + + [Test] public void ForwardingBatchSizeNotLowerThenMinimal() + { + int size = _asyncAppender.QueueCapacity / 2; + _asyncAppender.ForwardingBatchSize = size; + Assert.AreEqual(size, _asyncAppender.ForwardingBatchSize); + _asyncAppender.ActivateOptions(); + Assert.AreEqual( + _asyncAppender.QueueCapacity / AsyncAppender.MIN_CAPACITY_TO_BATCH_RATIO, + _asyncAppender.ForwardingBatchSize); + } + + [Test] public void BatchWaitTimeProperty() + { + Assert.AreEqual(AsyncAppender.DEFAULT_BATCH_WAIT_TIMEOUT.TotalSeconds, _asyncAppender.BatchWaitTimeout); + double t = AsyncAppender.DEFAULT_BATCH_WAIT_TIMEOUT.TotalSeconds * 2; + _asyncAppender.BatchWaitTimeout = t; + Assert.AreEqual(t, _asyncAppender.BatchWaitTimeout); + } + + [Test] public void SingleEventInBatchModeShouldReleaseAfterTimeOut() + { + LoggingEvent e = new LoggingEvent(new LoggingEventData()); + _asyncAppender.BatchWaitTimeout = 1; + _isReleased = false; + _mockAppender.OnAppend += + delegate(LoggingEvent e1) + { + Assert.AreEqual(e, e1); + _isReleased = true; + }; + _asyncAppender.ActivateOptions(); + _asyncAppender.DoAppend(e); + Thread.Sleep(500); + Assert.IsFalse(_isReleased, "Should not release the event in batch mode before timeout."); + Thread.Sleep(700); + Assert.IsTrue(_isReleased, "Should have released the event in batch mode after timeout."); + } + + [Test] public void MultipleEventInBatchModeShouldReleaseAfterTimeOut() + { + int count = 3; + LoggingEvent[] e = new LoggingEvent[count]; + for (int i = 0; i < count; i++) + { + e[i] = new LoggingEvent(new LoggingEventData()); + } + _asyncAppender.BatchWaitTimeout = 1; + _isReleased = false; + _mockAppender.OnBulkAppend += + delegate(LoggingEvent[] loggingEvents) + { + Assert.AreEqual(count, loggingEvents.Length); + for(int i=0; iCode +