Index: src/test/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorderTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorderTest.java	(revision 1350308)
+++ src/test/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorderTest.java	(working copy)
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.core.stats;
 
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jackrabbit.api.stats.RepositoryStatistics;
@@ -24,9 +25,25 @@
 
 public class TimeSeriesRecorderTest extends TestCase {
 
-    public void testCounter() {
-        TimeSeriesRecorder recorder = new TimeSeriesRecorder(
-                RepositoryStatistics.Type.SESSION_READ_COUNTER);
+    public void testTimeSeries() {
+        checkCounterSingle(new TimeSeriesRecorder(
+                RepositoryStatistics.Type.SESSION_READ_COUNTER));
+        checkCounterIncremental(new TimeSeriesRecorder(
+                RepositoryStatistics.Type.SESSION_COUNT));
+    }
+
+    public void testDelegatingTimeSeries() {
+        final AtomicLong counter = new AtomicLong(0);
+        StatisticProvider c = new StatisticProvider() {
+            public AtomicLong getCounter() {
+                return counter;
+            }
+        };
+        TimeSeriesRecorder recorder = new DelegatingTimeSeriesRecorder(c);
+        checkCounterIncremental(recorder);
+    }
+
+    private void checkCounterSingle(TimeSeriesRecorder recorder) {
         AtomicLong counter = recorder.getCounter();
 
         // initial values
@@ -111,6 +128,102 @@
         assertValues(recorder.getValuePerWeek(), 13);
     }
 
+    private void checkCounterIncremental(TimeSeriesRecorder recorder) {
+        AtomicLong counter = recorder.getCounter();
+
+        // initial values
+        assertValues(recorder.getValuePerSecond());
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // no changes in first second
+        recorder.recordOneSecond();
+        assertValues(recorder.getValuePerSecond());
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // one increment in second
+        counter.incrementAndGet();
+        recorder.recordOneSecond();
+        assertValues(recorder.getValuePerSecond(), 1);
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // two increments in second
+        counter.incrementAndGet();
+        counter.incrementAndGet();
+        recorder.recordOneSecond();
+        assertValues(recorder.getValuePerSecond(), 3, 1);
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // no changes in a second
+        recorder.recordOneSecond();
+        assertValues(recorder.getValuePerSecond(), 3, 3, 1);
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // ten increments in a second
+        counter.addAndGet(10);
+        long value = counter.get();
+        recorder.recordOneSecond();
+        assertValues(recorder.getValuePerSecond(), value, 3, 3, 1);
+        assertValues(recorder.getValuePerMinute());
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // one minute
+        for (int i = 0; i < 60; i++) {
+            recorder.recordOneSecond();
+        }
+        long avgM = (1 + 3 + 3 + value * 57) / 60;
+        assertValues(recorder.getValuePerSecond(), newArray(value, 60));
+        assertValues(recorder.getValuePerMinute(), avgM);
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // second minute
+        for (int i = 0; i < 60; i++) {
+            recorder.recordOneSecond();
+        }
+        assertValues(recorder.getValuePerSecond(), newArray(value, 60));
+        assertValues(recorder.getValuePerMinute(), value, avgM);
+        assertValues(recorder.getValuePerHour());
+        assertValues(recorder.getValuePerWeek());
+
+        // one hour
+        for (int i = 0; i < 60 * 60; i++) {
+            recorder.recordOneSecond();
+        }
+        long avgH = (avgM + 59 * value) / 60;
+        assertValues(recorder.getValuePerSecond(), newArray(value, 60));
+        assertValues(recorder.getValuePerMinute(), newArray(value, 60));
+        assertValues(recorder.getValuePerHour(), avgH);
+        assertValues(recorder.getValuePerWeek());
+
+        // one week
+        for (int i = 0; i < 7 * 24 * 60 * 60; i++) {
+            recorder.recordOneSecond();
+        }
+        int hoursInAWeek = 24 * 7;
+        long avgW = (avgH + value * (hoursInAWeek - 1)) / hoursInAWeek;
+        assertValues(recorder.getValuePerSecond(), newArray(value, 60));
+        assertValues(recorder.getValuePerMinute(), newArray(value, 60));
+        assertValues(recorder.getValuePerHour(), newArray(value, hoursInAWeek));
+        assertValues(recorder.getValuePerWeek(), avgW);
+    }
+
+    private static long[] newArray(long v, int size) {
+        long[] arr = new long[size];
+        Arrays.fill(arr, v);
+        return arr;
+    }
+
     private void assertValues(long[] values, long... expected) {
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], values[values.length - i - 1]);
Index: src/main/java/org/apache/jackrabbit/core/stats/StatisticProvider.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/stats/StatisticProvider.java	(revision 0)
+++ src/main/java/org/apache/jackrabbit/core/stats/StatisticProvider.java	(revision 0)
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Provides a counter info that can be aggregated into a time series by the
+ * {@link DelegatingTimeSeriesRecorder}.
+ * 
+ */
+public interface StatisticProvider {
+    AtomicLong getCounter();
+}
Index: src/main/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorder.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorder.java	(revision 1350308)
+++ src/main/java/org/apache/jackrabbit/core/stats/TimeSeriesRecorder.java	(working copy)
@@ -84,9 +84,9 @@
      */
     public synchronized void recordOneSecond() {
         if (resetValueEachSecond) {
-            valuePerSecond[seconds++] = counter.getAndSet(0);
+            valuePerSecond[seconds++] = getCounter().getAndSet(0);
         } else {
-            valuePerSecond[seconds++] = counter.get();
+            valuePerSecond[seconds++] = getCounter().get();
         }
         if (seconds == valuePerSecond.length) {
             seconds = 0;
Index: src/main/java/org/apache/jackrabbit/core/stats/RepositoryStatisticsImpl.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/stats/RepositoryStatisticsImpl.java	(revision 1350308)
+++ src/main/java/org/apache/jackrabbit/core/stats/RepositoryStatisticsImpl.java	(working copy)
@@ -106,6 +106,14 @@
         return recorder;
     }
 
+    public synchronized void createDelegatingRecorder(String type,
+            DelegatingTimeSeriesRecorder recorder) {
+        TimeSeriesRecorder old = recorders.get(type);
+        if (old == null || !(old instanceof DelegatingTimeSeriesRecorder)) {
+            recorders.put(type, recorder);
+        }
+    }
+
     private synchronized void recordOneSecond() {
         for (TimeSeriesRecorder recorder : recorders.values()) {
             recorder.recordOneSecond();
Index: src/main/java/org/apache/jackrabbit/core/stats/DelegatingTimeSeriesRecorder.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/stats/DelegatingTimeSeriesRecorder.java	(revision 0)
+++ src/main/java/org/apache/jackrabbit/core/stats/DelegatingTimeSeriesRecorder.java	(revision 0)
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Wraps an external counter provided by a {@link StatisticProvider} into a
+ * {@link TimeSeriesRecorder} to create a time series that tracks the evolution
+ * of the counter over a time period.
+ * 
+ * An instance of DelegatingTimeSeriesRecorder will act as a read-only
+ * TimeSeriesRecorder so that it does not modify the source counter.
+ * 
+ */
+public class DelegatingTimeSeriesRecorder extends TimeSeriesRecorder {
+
+    private final StatisticProvider provider;
+
+    public DelegatingTimeSeriesRecorder(StatisticProvider provider) {
+        // otherwise we run into the situation where the counter gets reset
+        super(false);
+        this.provider = provider;
+    }
+
+    @Override
+    public AtomicLong getCounter() {
+        return provider.getCounter();
+    }
+
+}

