Index: lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
===================================================================
--- lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (revision 1590432)
+++ lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (working copy)
@@ -40,7 +40,7 @@
final MergeScheduler mergeScheduler = new SerialMergeScheduler() {
@Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
- if (!mayMerge.get() && writer.getNextMerge() != null) {
+ if (!mayMerge.get() && writer.getAndPromoteNextPendingMerge() != null) {
throw new AssertionError();
}
super.merge(writer, trigger, newMergesFound);
Index: lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java
===================================================================
--- lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (revision 1590432)
+++ lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (working copy)
@@ -767,11 +767,6 @@
tmp.setSegmentsPerTier(Math.min(5, tmp.getSegmentsPerTier()));
tmp.setNoCFSRatio(1.0);
}
- MergeScheduler ms = w.getConfig().getMergeScheduler();
- if (ms instanceof ConcurrentMergeScheduler) {
- // wtf... shouldnt it be even lower since its 1 by default?!?!
- ((ConcurrentMergeScheduler) ms).setMaxMergesAndThreads(3, 2);
- }
}
/** Checks some basic behaviour of an AttributeImpl
Index: lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
===================================================================
--- lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (revision 1590432)
+++ lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (working copy)
@@ -865,9 +865,7 @@
} else if (rarely(r)) {
int maxThreadCount = TestUtil.nextInt(random(), 1, 4);
int maxMergeCount = TestUtil.nextInt(random(), maxThreadCount, maxThreadCount + 4);
- ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
- cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
- c.setMergeScheduler(cms);
+ c.setMergeScheduler(new ConcurrentMergeScheduler(maxMergeCount, maxThreadCount));
}
if (r.nextBoolean()) {
if (rarely(r)) {
Index: lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (revision 1590432)
+++ lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (working copy)
@@ -275,10 +275,10 @@
System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads);
}
- ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
+ ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(maxMergeCount, maxMergeThreads) {
@Override
- protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
try {
// Stall all incoming merges until we see
// maxMergeCount:
@@ -297,7 +297,7 @@
// Then sleep a bit to give a chance for the bug
// (too many pending merges) to appear:
Thread.sleep(20);
- super.doMerge(merge);
+ super.doMerge(writer, merge);
} finally {
runningMergeCount.decrementAndGet();
}
@@ -308,7 +308,6 @@
}
}
};
- cms.setMaxMergesAndThreads(maxMergeCount, maxMergeThreads);
iwc.setMergeScheduler(cms);
iwc.setMaxBufferedDocs(2);
@@ -334,13 +333,13 @@
long totMergedBytes;
public TrackingCMS() {
- setMaxMergesAndThreads(5, 5);
+ super(5, 5);
}
@Override
- public void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
totMergedBytes += merge.totalBytesSize();
- super.doMerge(merge);
+ super.doMerge(writer, merge);
}
}
Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (revision 1590432)
+++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (working copy)
@@ -1758,17 +1758,16 @@
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
final MergeScheduler ms = iwc.getMergeScheduler();
if (ms instanceof ConcurrentMergeScheduler) {
- final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() {
+ final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms;
+ final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler(cms.getMaxMergeCount(), cms.getMaxThreadCount()) {
@Override
- protected void handleMergeException(Throwable exc) {
+ protected void handleMergeException(MergePolicy.OneMerge merge, Throwable exc) {
// suppress only FakeIOException:
if (!(exc instanceof FakeIOException)) {
- super.handleMergeException(exc);
+ super.handleMergeException(merge, exc);
}
}
};
- final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms;
- suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount());
suppressFakeIOE.setMergeThreadPriority(cms.getMergeThreadPriority());
iwc.setMergeScheduler(suppressFakeIOE);
}
Index: lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (revision 1590432)
+++ lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (working copy)
@@ -54,7 +54,7 @@
// context, including ones from Object. So just filter out Object. If in
// the future MergeScheduler will extend a different class than Object,
// this will need to change.
- if (m.getDeclaringClass() != Object.class) {
+ if (m.getDeclaringClass() != Object.class && (Modifier.isFinal(m.getModifiers()) == false)) {
assertTrue(m + " is not overridden !", m.getDeclaringClass() == NoMergeScheduler.class);
}
}
Index: lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (revision 0)
+++ lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (working copy)
@@ -0,0 +1,106 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.codecs.lucene41.Lucene41PostingsFormat;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+public class TestSerialMergeScheduler extends LuceneTestCase {
+
+ // Just counts total and in-flight merges, asserting that
+ // at most 1 merge runs at once:
+ private class MergeCountingIndexWriter extends IndexWriter {
+
+ public final AtomicInteger totalMergeCount = new AtomicInteger();
+
+ private final AtomicInteger runningMergeCount = new AtomicInteger();
+
+ public MergeCountingIndexWriter(Directory dir, IndexWriterConfig iwc) throws IOException {
+ super(dir, iwc);
+ }
+
+ @Override
+ public void merge(MergePolicy.OneMerge merge) throws IOException {
+ totalMergeCount.incrementAndGet();
+ int count = runningMergeCount.incrementAndGet();
+ assertTrue(count + " merges running", count <= 1);
+ try {
+ super.merge(merge);
+ } finally {
+ runningMergeCount.decrementAndGet();
+ }
+ }
+ }
+
+ public void testOnlyOneMergeAtOnce() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+ iwc.setMaxBufferedDocs(2);
+ iwc.setMergeScheduler(new SerialMergeScheduler());
+ TieredMergePolicy tmp = new TieredMergePolicy();
+ tmp.setSegmentsPerTier(2.0);
+ iwc.setMergePolicy(tmp);
+
+ final MergeCountingIndexWriter writer = new MergeCountingIndexWriter(dir, iwc);
+
+ final CountDownLatch startingGun = new CountDownLatch(1);
+
+ Thread[] threads = new Thread[4];
+ for(int i=0;i Specify the max number of threads that may run at
- * once, and the maximum number of simultaneous merges
- * with {@link #setMaxMergesAndThreads}.
If the number of merges exceeds the max number of threads
* then the largest merges are paused until one of the smaller
@@ -44,10 +46,10 @@
*/
public class ConcurrentMergeScheduler extends MergeScheduler {
- private int mergeThreadPriority = -1;
+ private volatile int mergeThreadPriority = -1;
/** List of currently active {@link MergeThread}s. */
- protected List This merge scheduler allows only one merge to run at a
+ * time, and if a new merge needs to kick off while
+ * one is already running, the thread will block by default
+ * until the first merge completes. */
+
public class SerialMergeScheduler extends MergeScheduler {
/** Sole constructor. */
public SerialMergeScheduler() {
+ super(1);
}
- /** Just do the merges in sequence. We do this
- * "synchronized" so that even if the application is using
- * multiple threads, only one merge may run at a time. */
+ /** Just do the merges in sequence, only allowing one
+ * incoming indexing thread to be merging at once. */
@Override
- synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
+ public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
- while(true) {
- MergePolicy.OneMerge merge = writer.getNextMerge();
- if (merge == null)
+ while (true) {
+
+ MergePolicy.OneMerge merge = getNextMerge(writer);
+ if (merge == null) {
+ if (verbose()) {
+ message("no more merges pending; now return");
+ }
break;
- writer.merge(merge);
+ }
+
+ if (verbose()) {
+ message("run merge=" + writer.segString(merge.segments));
+ }
+
+ try {
+ writer.merge(merge);
+ } finally {
+ mergeFinished();
+ }
}
}
Index: lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (revision 1590432)
+++ lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (working copy)
@@ -32,8 +32,9 @@
/** The single instance of {@link NoMergeScheduler} */
public static final MergeScheduler INSTANCE = new NoMergeScheduler();
+ // prevent instantiation
private NoMergeScheduler() {
- // prevent instantiation
+ super(0);
}
@Override
@@ -46,5 +47,4 @@
public MergeScheduler clone() {
return this;
}
-
}
Index: lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (revision 1590432)
+++ lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (working copy)
@@ -19,21 +19,32 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util.InfoStream;
+
/** Expert: {@link IndexWriter} uses an instance
- * implementing this interface to execute the merges
+ * of this to execute the merges
* selected by a {@link MergePolicy}. The default
* MergeScheduler is {@link ConcurrentMergeScheduler}. Implementers of sub-classes should make sure that {@link #clone()}
* returns an independent instance able to work with any {@link IndexWriter}
* instance.maxThreadCount merges at a time.
- * @param maxThreadCount the max # simultaneous merge threads that should
- * be running at once. This must be <= maxMergeCount
- */
- public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount) {
+ /** Creates this.
+ *
+ * @param maxMergeCount Maximum number of merges before
+ * incoming segment-producing threads are forcefully stalled.
+ * @param maxThreadCount Maximum merge threads that can
+ * run concurrently; this must be <=
+ * maxMergeCount. */
+ public ConcurrentMergeScheduler(int maxMergeCount, int maxThreadCount) {
+ super(maxMergeCount);
if (maxThreadCount < 1) {
throw new IllegalArgumentException("maxThreadCount should be at least 1");
}
@@ -110,26 +102,18 @@
throw new IllegalArgumentException("maxThreadCount should be <= maxMergeCount (= " + maxMergeCount + ")");
}
this.maxThreadCount = maxThreadCount;
- this.maxMergeCount = maxMergeCount;
}
- /** Returns {@code maxThreadCount}.
- *
- * @see #setMaxMergesAndThreads(int, int) */
+ /** Returns {@code maxThreadCount}. */
public int getMaxThreadCount() {
return maxThreadCount;
}
- /** See {@link #setMaxMergesAndThreads}. */
- public int getMaxMergeCount() {
- return maxMergeCount;
- }
-
/** Return the priority that merge threads run at. By
* default the priority is 1 plus the priority of (ie,
* slightly higher priority than) the first thread that
* calls merge. */
- public synchronized int getMergeThreadPriority() {
+ public int getMergeThreadPriority() {
initMergeThreadPriority();
return mergeThreadPriority;
}
@@ -140,27 +124,21 @@
* set this any higher than
* Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
* room to set relative priority among threads. */
- public synchronized void setMergeThreadPriority(int pri) {
- if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
+ public void setMergeThreadPriority(int pri) {
+ if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
+ }
mergeThreadPriority = pri;
updateMergeThreads();
}
- /** Sorts {@link MergeThread}s; larger merges come first. */
- protected static final ComparatorIndexWriter without committing
* any changes that have occurred since the last commit
@@ -3600,9 +3606,11 @@
}
}
}
+
} catch (OutOfMemoryError oom) {
handleOOM(oom, "merge");
}
+
if (merge.info != null && !merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs");
Index: lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 1590432)
+++ lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy)
@@ -19,25 +19,45 @@
import java.io.IOException;
-/** A {@link MergeScheduler} that simply does each merge
- * sequentially, using the current thread. */
+/** A {@link MergeScheduler} that does each merge
+ * using the current (indexing) thread.
+ *
+ *
+ * if (verbose()) {
+ * message("your message");
+ * }
+ *
+ */
+ protected boolean verbose() {
+ return infoStream != null && infoStream.isEnabled("MS");
+ }
+
+ /**
+ * Outputs the given message - this method assumes {@link #verbose()} was
+ * called and returned true.
+ */
+ protected void message(String message) {
+ infoStream.message("MS", message);
+ }
+
+ /** Subclass calls this to get the next merge. If there
+ * are more than {@code maxMergeCount} merges running then this
+ * method will call {@link #maybeStall} to stall (by
+ * default) until merges catch up. Be sure
+ * to call {@link #mergeFinished} once the merge is
+ * done. */
+ protected MergePolicy.OneMerge getNextMerge(IndexWriter writer) {
+ if (permits.tryAcquire() || (writer.hasPendingMerges() && maybeStall(writer))) {
+ MergePolicy.OneMerge merge = null;
+ try {
+ merge = writer.getAndPromoteNextPendingMerge();
+ } finally {
+ if (merge == null) {
+ permits.release();
+ }
+ }
+
+ return merge;
+ } else {
+ return null;
+ }
+ }
+
+ /** Called from {@link #getNextMerge} when there are too
+ * many merges. The default implementation
+ * stalls the incoming (segment-creating) thread as a
+ * simple but effective denial-of-service protection.
+ * Return true if the the thread may now execute a
+ * merge, or false if the thread should just return
+ * without merging. */
+ protected boolean maybeStall(IndexWriter writer) {
+ if (verbose()) {
+ message("too many merges (" + writer.getRunningMergeCount() + " vs max=" + getMaxMergeCount() + "); stalling current thread...");
+ }
+ long start = System.currentTimeMillis();
+ boolean acquired = false;
+ boolean success = false;
+ try {
+ permits.acquire();
+ acquired = true;
+ if (verbose()) {
+ message("stalled for " + (System.currentTimeMillis()-start) + " msec");
+ }
+ success = true;
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ } finally {
+ if (acquired && success == false) {
+ // Hit an exception in verbose() or message():
+ permits.release();
+ }
+ }
+
+ return true;
+ }
+
+ /** Subclass must call this after finishing each merge. */
+ protected void mergeFinished() {
+ permits.release();
+ }
+
@Override
public MergeScheduler clone() {
try {
- return (MergeScheduler) super.clone();
+ MergeScheduler clone = (MergeScheduler) super.clone();
+ clone.infoStream = null;
+ clone.permits = new Semaphore(clone.maxMergeCount, true);
+ return clone;
} catch (CloneNotSupportedException e) {
throw new Error(e);
}
Index: lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
===================================================================
--- lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 1590432)
+++ lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy)
@@ -110,19 +110,15 @@
"org.apache.lucene.index.ConcurrentMergeScheduler");
if (mergeScheduler.equals(NoMergeScheduler.class.getName())) {
iwConf.setMergeScheduler(NoMergeScheduler.INSTANCE);
+ } else if (mergeScheduler.equals(ConcurrentMergeScheduler.class.getName())) {
+ iwConf.setMergeScheduler(new ConcurrentMergeScheduler(config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT),
+ config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT)));
} else {
try {
iwConf.setMergeScheduler(Class.forName(mergeScheduler).asSubclass(MergeScheduler.class).newInstance());
} catch (Exception e) {
throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as merge scheduler", e);
}
-
- if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) {
- ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) iwConf.getMergeScheduler();
- int maxThreadCount = config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT);
- int maxMergeCount = config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT);
- cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
- }
}
final String defaultCodec = config.get("default.codec", null);
Index: solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
===================================================================
--- solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (revision 1590432)
+++ solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (working copy)
@@ -288,28 +288,27 @@
private MergeScheduler buildMergeScheduler(IndexSchema schema) {
String msClassName = mergeSchedulerInfo == null ? SolrIndexConfig.DEFAULT_MERGE_SCHEDULER_CLASSNAME : mergeSchedulerInfo.className;
- MergeScheduler scheduler = schema.getResourceLoader().newInstance(msClassName, MergeScheduler.class);
- if (mergeSchedulerInfo != null) {
- // LUCENE-5080: these two setters are removed, so we have to invoke setMaxMergesAndThreads
- // if someone has them configured.
- if (scheduler instanceof ConcurrentMergeScheduler) {
- NamedList args = mergeSchedulerInfo.initArgs.clone();
- Integer maxMergeCount = (Integer) args.remove("maxMergeCount");
- if (maxMergeCount == null) {
- maxMergeCount = ((ConcurrentMergeScheduler) scheduler).getMaxMergeCount();
- }
- Integer maxThreadCount = (Integer) args.remove("maxThreadCount");
- if (maxThreadCount == null) {
- maxThreadCount = ((ConcurrentMergeScheduler) scheduler).getMaxThreadCount();
- }
- ((ConcurrentMergeScheduler)scheduler).setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
- SolrPluginUtils.invokeSetters(scheduler, args);
- } else {
- SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs);
+ MergeScheduler scheduler;
+ if (msClassName.equals("org.apache.lucene.index.ConcurrentMergeScheduler") && mergeSchedulerInfo != null) {
+ NamedList args = mergeSchedulerInfo.initArgs;
+ Integer maxMergeCount = (Integer) args.remove("maxMergeCount");
+ if (maxMergeCount == null) {
+ maxMergeCount = ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT;
}
+ Integer maxThreadCount = (Integer) args.remove("maxThreadCount");
+ if (maxThreadCount == null) {
+ maxThreadCount = ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT;
+ }
+ scheduler = new ConcurrentMergeScheduler(maxMergeCount, maxThreadCount);
+ } else {
+ scheduler = schema.getResourceLoader().newInstance(msClassName, MergeScheduler.class);
}
+ if (mergeSchedulerInfo != null) {
+ SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs);
+ }
+
return scheduler;
}