Index: src/java/org/apache/lucene/index/MergePolicy.java
===================================================================
--- src/java/org/apache/lucene/index/MergePolicy.java (revision 1103154)
+++ src/java/org/apache/lucene/index/MergePolicy.java (working copy)
@@ -72,7 +72,7 @@
long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter
int maxNumSegmentsOptimize; // used by IndexWriter
- long estimatedMergeBytes; // used by IndexWriter
+ public long estimatedMergeBytes; // used by IndexWriter
List readers; // used by IndexWriter
List readerClones; // used by IndexWriter
public final List segments;
Index: contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java
===================================================================
--- contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0)
+++ contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0)
@@ -0,0 +1,176 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestNRTCachingDirectory extends LuceneTestCase {
+
+ private static class SuppressingMergePolicy extends MergePolicy {
+ private final MergePolicy other;
+ private volatile boolean suppress;
+
+ public SuppressingMergePolicy(MergePolicy other) {
+ this.other = other;
+ }
+
+ @Override
+ public void setIndexWriter(IndexWriter writer) {
+ this.writer.set(writer);
+ other.setIndexWriter(writer);
+ }
+
+ public void setSuppress(boolean suppress) {
+ this.suppress = suppress;
+ }
+
+ @Override
+ public MergeSpecification findMerges(SegmentInfos segmentInfos)
+ throws CorruptIndexException, IOException {
+ if (!suppress) {
+ return other.findMerges(segmentInfos);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public MergeSpecification findMergesForOptimize(
+ SegmentInfos segmentInfos, int maxSegmentCount, Set segmentsToOptimize)
+ throws CorruptIndexException, IOException {
+ if (!suppress) {
+ return other.findMergesForOptimize(segmentInfos, maxSegmentCount, segmentsToOptimize);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public MergeSpecification findMergesToExpungeDeletes(
+ SegmentInfos segmentInfos) throws CorruptIndexException, IOException {
+ if (!suppress) {
+ return other.findMergesToExpungeDeletes(segmentInfos);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ other.close();
+ }
+
+ @Override
+ public boolean useCompoundFile(SegmentInfos segments, SegmentInfo newSegment) throws IOException {
+ return other.useCompoundFile(segments, newSegment);
+ }
+ }
+
+ public void testNRTAndCommit() throws Exception {
+ Directory dir = newDirectory();
+ NRTCachingDirectory cachedDir = new NRTCachingDirectory(dir, 2.0, 25.0);
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
+ conf.setMergeScheduler(cachedDir.getMergeScheduler());
+ final SuppressingMergePolicy smp = new SuppressingMergePolicy(conf.getMergePolicy());
+ conf.setMergePolicy(smp);
+ RandomIndexWriter w = new RandomIndexWriter(random, cachedDir, conf);
+ w.w.setInfoStream(VERBOSE ? System.out : null);
+ final LineFileDocs docs = new LineFileDocs(random);
+ final int numDocs = _TestUtil.nextInt(random, 100, 400);
+
+ if (VERBOSE) {
+ System.out.println("TEST: numDocs=" + numDocs);
+ }
+
+ final List ids = new ArrayList();
+ IndexReader r = null;
+ for(int docCount=0;docCountThis class is really only useful in a near-real-time
+ * context, where indexing rate is lowish but reopen
+ * rate is highish, resulting in many tiny files being
+ * written. This directory keeps such segments (as well as
+ * the segments produced by merging them, as long as they
+ * are small enough), in RAM.
+ *
+ * This is safe to use, ie, when your app calls commit,
+ * this class will write all files to disk and commit them
+ * as well.
+ *
+ * NOTE: this class is somewhat sneaky in its
+ * approach for spying on merges to determine the size of a
+ * merge: it records which threads are running which merges
+ * by watching ConcurrentMergeScheduler's doMerge method.
+ * While this works correctly, likely future versions of
+ * this class will take a more general approach.
+ *
+ * @lucene.experimental
+ */
+
+public class NRTCachingDirectory extends Directory {
+
+ private final RAMDirectory cache = new RAMDirectory();
+
+ private final Directory delegate;
+
+ private final long maxMergeSizeBytes;
+ private final long maxCachedBytes;
+
+ private static final boolean VERBOSE = true;
+
+ /**
+ * We will cache a newly created output if 1) it's a
+ * flush or a merge and the estimated size of the merged segmnt is <=
+ * maxMergeSizeMB, and 2) the total cached bytes is <=
+ * maxCachedMB */
+ public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) {
+ this.delegate = delegate;
+ maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024);
+ maxCachedBytes = (long) (maxCachedMB*1024*1024);
+ }
+
+ @Override
+ public synchronized String[] listAll() throws IOException {
+ final Set files = new HashSet();
+ for(String f : cache.listAll()) {
+ files.add(f);
+ }
+ for(String f : delegate.listAll()) {
+ assert !files.contains(f);
+ files.add(f);
+ }
+ return files.toArray(new String[files.size()]);
+ }
+
+ /** Returns how many bytes are being used by the
+ * RAMDirectory cache */
+ public long sizeInBytes() {
+ return cache.sizeInBytes();
+ }
+
+ @Override
+ public synchronized boolean fileExists(String name) throws IOException {
+ return cache.fileExists(name) || delegate.fileExists(name);
+ }
+
+ @Override
+ public synchronized long fileModified(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.fileModified(name);
+ } else {
+ return delegate.fileModified(name);
+ }
+ }
+
+ @Override
+ public synchronized void touchFile(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ cache.touchFile(name);
+ } else {
+ delegate.touchFile(name);
+ }
+ }
+
+ @Override
+ public synchronized void deleteFile(String name) throws IOException {
+ // Delete from both, in case we are currently uncaching:
+ if (VERBOSE) {
+ System.out.println("nrtdir.deleteFile name=" + name);
+ }
+ cache.deleteFile(name);
+ delegate.deleteFile(name);
+ }
+
+ @Override
+ public synchronized long fileLength(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.fileLength(name);
+ } else {
+ return delegate.fileLength(name);
+ }
+ }
+
+ public String[] listCachedFiles() {
+ return cache.listAll();
+ }
+
+ @Override
+ public IndexOutput createOutput(String name) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.createOutput name=" + name);
+ }
+ if (doCacheWrite(name)) {
+ if (VERBOSE) {
+ System.out.println(" to cache");
+ }
+ return cache.createOutput(name);
+ } else {
+ return delegate.createOutput(name);
+ }
+ }
+
+ @Override
+ public void sync(Collection fileNames) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.sync files=" + fileNames);
+ }
+ for(String fileName : fileNames) {
+ unCache(fileName);
+ }
+ delegate.sync(fileNames);
+ }
+
+ @Override
+ public synchronized IndexInput openInput(String name) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.openInput name=" + name);
+ }
+ if (cache.fileExists(name)) {
+ if (VERBOSE) {
+ System.out.println(" from cache");
+ }
+ return cache.openInput(name);
+ } else {
+ return delegate.openInput(name);
+ }
+ }
+
+ @Override
+ public synchronized IndexInput openInput(String name, int bufferSize) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.openInput(name, bufferSize);
+ } else {
+ return delegate.openInput(name, bufferSize);
+ }
+ }
+
+ @Override
+ public Lock makeLock(String name) {
+ return delegate.makeLock(name);
+ }
+
+ @Override
+ public void clearLock(String name) throws IOException {
+ delegate.clearLock(name);
+ }
+
+ /** Close thius directory, which flushes any cached files
+ * to the delegate and then closes the delegate. */
+ @Override
+ public void close() throws IOException {
+ for(String fileName : cache.listAll()) {
+ unCache(fileName);
+ }
+ cache.close();
+ delegate.close();
+ }
+
+ private final ConcurrentHashMap merges = new ConcurrentHashMap();
+
+ public MergeScheduler getMergeScheduler() {
+ return new ConcurrentMergeScheduler() {
+ @Override
+ protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ try {
+ merges.put(Thread.currentThread(), merge);
+ super.doMerge(merge);
+ } finally {
+ merges.remove(Thread.currentThread());
+ }
+ }
+ };
+ }
+
+ /** Subclass can override this to customize logic; return
+ * true if this file should be written to the RAMDirectory. */
+ protected boolean doCacheWrite(String name) {
+ final MergePolicy.OneMerge merge = merges.get(Thread.currentThread());
+ //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes));
+ return (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes;
+ }
+
+ private void unCache(String fileName) throws IOException {
+ final IndexOutput out;
+ synchronized(this) {
+ if (!delegate.fileExists(fileName)) {
+ assert cache.fileExists(fileName);
+ out = delegate.createOutput(fileName);
+ } else {
+ out = null;
+ }
+ }
+
+ if (out != null) {
+ IndexInput in = null;
+ try {
+ in = cache.openInput(fileName);
+ in.copyBytes(out, in.length());
+ } finally {
+ IOUtils.closeSafely(in, out);
+ }
+ synchronized(this) {
+ cache.deleteFile(fileName);
+ }
+ }
+ }
+}
+
Property changes on: contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java
___________________________________________________________________
Added: svn:eol-style
+ native