Index: lucene/src/java/org/apache/lucene/index/MergePolicy.java =================================================================== --- lucene/src/java/org/apache/lucene/index/MergePolicy.java (revision 1104081) +++ lucene/src/java/org/apache/lucene/index/MergePolicy.java (working copy) @@ -72,7 +72,7 @@ long mergeGen; // used by IndexWriter boolean isExternal; // used by IndexWriter int maxNumSegmentsOptimize; // used by IndexWriter - long estimatedMergeBytes; // used by IndexWriter + public long estimatedMergeBytes; // used by IndexWriter List readers; // used by IndexWriter List readerClones; // used by IndexWriter public final List segments; Index: lucene/contrib/CHANGES.txt =================================================================== --- lucene/contrib/CHANGES.txt (revision 1104081) +++ lucene/contrib/CHANGES.txt (working copy) @@ -85,6 +85,11 @@ it cannot group by function queries nor arbitrary queries. (Mike McCandless) + * LUCENE-3092: Added NRTCachingDirectory in contrib/misc, which + caches small segments in RAM. This is useful, in near-real-time + case where the indexing rate is lowish but the reopen rate is + highish, to take load off the IO system. (Mike McCandless) + Optimizations * LUCENE-3040: Switch all analysis consumers (highlighter, morelikethis, memory, ...) Index: lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java =================================================================== --- lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0) +++ lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0) @@ -0,0 +1,120 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LineFileDocs; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.Version; +import org.apache.lucene.util._TestUtil; + +public class TestNRTCachingDirectory extends LuceneTestCase { + + public void testNRTAndCommit() throws Exception { + Directory dir = newDirectory(); + NRTCachingDirectory cachedDir = new NRTCachingDirectory(dir, 2.0, 25.0); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + conf.setMergeScheduler(cachedDir.getMergeScheduler()); + RandomIndexWriter w = new RandomIndexWriter(random, cachedDir, conf); + w.w.setInfoStream(VERBOSE ? System.out : null); + final LineFileDocs docs = new LineFileDocs(random); + final int numDocs = _TestUtil.nextInt(random, 100, 400); + + if (VERBOSE) { + System.out.println("TEST: numDocs=" + numDocs); + } + + final List ids = new ArrayList(); + IndexReader r = null; + for(int docCount=0;docCountThis class is likely only useful in a near-real-time + * context, where indexing rate is lowish but reopen + * rate is highish, resulting in many tiny files being + * written. This directory keeps such segments (as well as + * the segments produced by merging them, as long as they + * are small enough), in RAM.

+ * + *

This is safe to use: when your app calls {IndexWriter#commit}, + * all cached files will be flushed from the cached and sync'd.

+ * + *

NOTE: this class is somewhat sneaky in its + * approach for spying on merges to determine the size of a + * merge: it records which threads are running which merges + * by watching ConcurrentMergeScheduler's doMerge method. + * While this works correctly, likely future versions of + * this class will take a more general approach. + * + *

Here's a simple example usage: + * + *

+ *   Directory fsDir = FSDirectory.open(new File("/path/to/index"));
+ *   NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
+ *   IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
+ *   conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
+ *   IndexWriter writer = new IndexWriter(cachedFSDir, conf);
+ * 
+ * + *

This will cache all newly flushed segments, all merges + * whose expected segment size is <= 5 MB, unless the net + * cached bytes exceeds 60 MB at which point all writes will + * not be cached (until the net bytes falls below 60 MB).

+ * + * @lucene.experimental + */ + +public class NRTCachingDirectory extends Directory { + + private final RAMDirectory cache = new RAMDirectory(); + + private final Directory delegate; + + private final long maxMergeSizeBytes; + private final long maxCachedBytes; + + private static final boolean VERBOSE = false; + + /** + * We will cache a newly created output if 1) it's a + * flush or a merge and the estimated size of the merged segmnt is <= + * maxMergeSizeMB, and 2) the total cached bytes is <= + * maxCachedMB */ + public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) { + this.delegate = delegate; + maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024); + maxCachedBytes = (long) (maxCachedMB*1024*1024); + } + + @Override + public synchronized String[] listAll() throws IOException { + final Set files = new HashSet(); + for(String f : cache.listAll()) { + files.add(f); + } + for(String f : delegate.listAll()) { + assert !files.contains(f); + files.add(f); + } + return files.toArray(new String[files.size()]); + } + + /** Returns how many bytes are being used by the + * RAMDirectory cache */ + public long sizeInBytes() { + return cache.sizeInBytes(); + } + + @Override + public synchronized boolean fileExists(String name) throws IOException { + return cache.fileExists(name) || delegate.fileExists(name); + } + + @Override + public synchronized long fileModified(String name) throws IOException { + if (cache.fileExists(name)) { + return cache.fileModified(name); + } else { + return delegate.fileModified(name); + } + } + + @Override + public synchronized void touchFile(String name) throws IOException { + if (cache.fileExists(name)) { + cache.touchFile(name); + } else { + delegate.touchFile(name); + } + } + + @Override + public synchronized void deleteFile(String name) throws IOException { + // Delete from both, in case we are currently uncaching: + if (VERBOSE) { + System.out.println("nrtdir.deleteFile name=" + name); + } + cache.deleteFile(name); + delegate.deleteFile(name); + } + + @Override + public synchronized long fileLength(String name) throws IOException { + if (cache.fileExists(name)) { + return cache.fileLength(name); + } else { + return delegate.fileLength(name); + } + } + + public String[] listCachedFiles() { + return cache.listAll(); + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + if (VERBOSE) { + System.out.println("nrtdir.createOutput name=" + name); + } + if (doCacheWrite(name)) { + if (VERBOSE) { + System.out.println(" to cache"); + } + return cache.createOutput(name); + } else { + return delegate.createOutput(name); + } + } + + @Override + public void sync(Collection fileNames) throws IOException { + if (VERBOSE) { + System.out.println("nrtdir.sync files=" + fileNames); + } + for(String fileName : fileNames) { + unCache(fileName); + } + delegate.sync(fileNames); + } + + @Override + public synchronized IndexInput openInput(String name) throws IOException { + if (VERBOSE) { + System.out.println("nrtdir.openInput name=" + name); + } + if (cache.fileExists(name)) { + if (VERBOSE) { + System.out.println(" from cache"); + } + return cache.openInput(name); + } else { + return delegate.openInput(name); + } + } + + @Override + public synchronized IndexInput openInput(String name, int bufferSize) throws IOException { + if (cache.fileExists(name)) { + return cache.openInput(name, bufferSize); + } else { + return delegate.openInput(name, bufferSize); + } + } + + @Override + public Lock makeLock(String name) { + return delegate.makeLock(name); + } + + @Override + public void clearLock(String name) throws IOException { + delegate.clearLock(name); + } + + /** Close thius directory, which flushes any cached files + * to the delegate and then closes the delegate. */ + @Override + public void close() throws IOException { + for(String fileName : cache.listAll()) { + unCache(fileName); + } + cache.close(); + delegate.close(); + } + + private final ConcurrentHashMap merges = new ConcurrentHashMap(); + + public MergeScheduler getMergeScheduler() { + return new ConcurrentMergeScheduler() { + @Override + protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + try { + merges.put(Thread.currentThread(), merge); + super.doMerge(merge); + } finally { + merges.remove(Thread.currentThread()); + } + } + }; + } + + /** Subclass can override this to customize logic; return + * true if this file should be written to the RAMDirectory. */ + protected boolean doCacheWrite(String name) { + final MergePolicy.OneMerge merge = merges.get(Thread.currentThread()); + //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes)); + return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes; + } + + private void unCache(String fileName) throws IOException { + final IndexOutput out; + synchronized(this) { + if (!delegate.fileExists(fileName)) { + assert cache.fileExists(fileName); + out = delegate.createOutput(fileName); + } else { + out = null; + } + } + + if (out != null) { + IndexInput in = null; + try { + in = cache.openInput(fileName); + in.copyBytes(out, in.length()); + } finally { + IOUtils.closeSafely(in, out); + } + synchronized(this) { + cache.deleteFile(fileName); + } + } + } +} + Property changes on: lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java ___________________________________________________________________ Added: svn:eol-style + native