Index: src/test/org/apache/lucene/realtime/TestRealtime.java
===================================================================
--- src/test/org/apache/lucene/realtime/TestRealtime.java	(revision 0)
+++ src/test/org/apache/lucene/realtime/TestRealtime.java	(revision 0)
@@ -0,0 +1,59 @@
+package org.apache.lucene.realtime;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
+import org.apache.lucene.realtime.RealtimeIndex.Transaction;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.LuceneTestCase;
+
+/**
+ * Tests the realtime indexing system
+ *
+ */
+public class TestRealtime extends LuceneTestCase {
+  public void test() throws Exception {
+    Analyzer analyzer = new StandardAnalyzer();
+    RAMDirectory directory = new MockRAMDirectory();
+    IndexWriter writer = new IndexWriter(directory, analyzer, MaxFieldLength.UNLIMITED);
+    RealtimeIndex realtimeIndex = new RealtimeIndex(writer, 1024); // 1k is low which causes flushToDisk more often
+    int num = 50;
+    for (int x=0; x < num; x++) {
+      Transaction transaction = realtimeIndex.createTransaction(true);
+      Document d = createDocument(x, "rt1", 5);
+      transaction.addDocument(d, analyzer);
+      transaction.commit();
+    }
+    
+    IndexReader reader = realtimeIndex.getReader();
+    assertEquals(reader.maxDoc(), num);
+    reader.close();
+    realtimeIndex.close();
+    IndexReader ramReader = realtimeIndex.getRamReader();
+    assertNull(ramReader);
+  }
+  
+  public static Document createDocument(int n, String indexName, int numFields) {
+    StringBuffer sb = new StringBuffer();
+    Document doc = new Document();
+    doc.add(new Field("id", Integer.toString(n), Store.YES, Index.NOT_ANALYZED));
+    doc.add(new Field("indexname", indexName, Store.YES, Index.NOT_ANALYZED));
+    sb.append("a");
+    sb.append(n);
+    doc.add(new Field("field1", sb.toString(), Store.YES, Index.ANALYZED));
+    sb.append(" b");
+    sb.append(n);
+    for (int i = 1; i < numFields; i++) {
+      doc.add(new Field("field" + (i + 1), sb.toString(), Store.YES,
+          Index.ANALYZED));
+    }
+    return doc;
+  }
+}

Property changes on: src/test/org/apache/lucene/realtime/TestRealtime.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:keywords
   + "Date Rev Author URL Id"
Name: svn:eol-style
   + native

Index: src/java/org/apache/lucene/realtime/RealtimeIndex.java
===================================================================
--- src/java/org/apache/lucene/realtime/RealtimeIndex.java	(revision 0)
+++ src/java/org/apache/lucene/realtime/RealtimeIndex.java	(revision 0)
@@ -0,0 +1,288 @@
+package org.apache.lucene.realtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * Implements a realtime index.  One background thread periodically flushes
+ * the in ram index to disk.
+ * 
+ * For normal batch indexing, you can use the IndexWriter that you
+ * passed into the constructor.
+ */
+public class RealtimeIndex {
+  public long maxRamSize = 1024*1024; // 1 megabyte
+  private IndexWriter diskWriter;
+  private RAMDirectory ramDirectory;
+  private IndexWriter ramWriter;
+  FlushToDiskThread flushToDiskThread = new FlushToDiskThread();
+  private boolean isClosed = false;
+  Analyzer defaultAnalyzer;
+  Directory diskDirectory;
+  private List exceptions = new ArrayList();
+  
+  /**
+   * @param diskWriter The writer used to maintain the disk index
+   * @param maxRamSize The maximum amount of ram the ram index should use
+   */
+  public RealtimeIndex(IndexWriter diskWriter, long maxRamSize) {
+    this.diskWriter = diskWriter;
+    this.maxRamSize = maxRamSize;
+    this.defaultAnalyzer = diskWriter.getAnalyzer();
+  }
+  
+  IndexReader getRamReader() throws IOException {
+    if (ramWriter == null) return null;
+    return ramWriter.getReader();
+  }
+  
+  /**
+   * Flushes the ramdirectory to disk and closes the underlying diskWriter.
+   * Closes the disk indexwriter
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    flushToDisk();
+    isClosed = true;
+    synchronized (this) {
+      diskWriter.close();
+    }
+  }
+  
+  /**
+   * Creates a transaction to which documents and deletes may be added to.
+   * @param useRamDir If true, a ram directory is used to add the documents to, if false the documents are cached and committed
+   * @return The new Transaction
+   * @throws Exception
+   */
+  public Transaction createTransaction(boolean useRamDir) throws Exception {
+    return new Transaction(this, useRamDir);
+  }
+  
+  /**
+   * Accumulated exceptions
+   * @return
+   */
+  public List getExceptions() {
+    return exceptions;
+  }
+  
+  /**
+   * Single thread used to flush the RAM index to disk.  
+   */
+  private class FlushToDiskThread extends Thread {
+    RAMDirectory toDiskDir;
+
+    public FlushToDiskThread() {
+      start();
+    }
+    
+    public void run() {
+      while (!isClosed) {
+        synchronized (this) {
+          try {
+            wait();
+          } catch (InterruptedException ex) {
+          }
+          if (isClosed) return;
+          try {
+            if (toDiskDir != null) {
+              diskWriter.addIndexesNoOptimize(new Directory[] { toDiskDir });
+              toDiskDir = null;
+            }
+          } catch (IOException ioe) {
+            synchronized (exceptions) {
+              exceptions.add(ioe);
+            }
+          }
+        }
+      }
+    }
+    
+    /**
+     * Calls notifyAll to wake up the waiting run method
+     * @param toDiskDir
+     * @throws IOException
+     */
+    public synchronized void flush(RAMDirectory toDiskDir) throws IOException {
+      this.toDiskDir = toDiskDir;
+      notifyAll(); // tell the run method to flush the toDiskDir
+    }
+  }
+
+  private synchronized void maybeFlushToDisk() throws IOException {
+    if (ramDirectory != null && ramDirectory.sizeInBytes() > maxRamSize) {
+      flushToDisk();
+    }
+  }
+  
+  /**
+   * Flush the ram directory to disk
+   * @throws IOException
+   */
+  private synchronized void flushToDisk() throws IOException {
+    if (ramDirectory != null) {
+      RAMDirectory toDiskDir = ramDirectory;
+      ramDirectory = null;
+      ramWriter = null;
+      flushToDiskThread.flush(toDiskDir);
+    }
+  }
+
+  private synchronized IndexReader commit(Transaction transaction)
+      throws IOException {
+    maybeFlushToDisk();
+    if (ramDirectory == null) {
+      ramDirectory = new RAMDirectory();
+      ramWriter = new IndexWriter(ramDirectory, defaultAnalyzer, true,
+          MaxFieldLength.UNLIMITED);
+    }
+    if (diskWriter == null) {
+      diskWriter = new IndexWriter(diskDirectory, defaultAnalyzer, true,
+          MaxFieldLength.UNLIMITED);
+    }
+    // if the transaction is a ramdir, then add it to the ramWriter
+    if (transaction.useRamDir) {
+      ramWriter
+        .addIndexesNoOptimize(new Directory[] { transaction.ramDirectory });
+      ramWriter.commit(); // need to commit otherwise the call to getreader does not show the documents
+    } else {
+      // otherwise we add the docs directly to the ram writer
+      if (transaction.documents != null) {
+        for (int x=0; x < transaction.documents.size(); x++) {
+          DocumentAnalyzer da = (DocumentAnalyzer)transaction.documents.get(x);
+          ramWriter.addDocument(da.document, da.analyzer);
+        }
+      }
+    }
+    ramWriter.deleteDocuments(transaction.getDeleteTerms());
+    ramWriter.deleteDocuments(transaction.getDeleteQueries());
+    diskWriter.deleteDocuments(transaction.getDeleteTerms());
+    diskWriter.deleteDocuments(transaction.getDeleteQueries());
+    IndexReader ramReader = ramWriter.getReader();
+    IndexReader diskReader = diskWriter.getReader();
+    return new MultiReader(new IndexReader[] { ramReader, diskReader });
+  }
+  
+  /**
+   * Holder class for a document and analyzer
+   */
+  private static class DocumentAnalyzer {
+    public final Document document;
+    public final Analyzer analyzer;
+
+    public DocumentAnalyzer(Document document, Analyzer analyzer) {
+      this.document = document;
+      this.analyzer = analyzer;
+    }
+  }
+  
+  /**
+   * Transaction applied to the underlying indexes atomically.
+   */
+  public static class Transaction {
+    private List deleteTerms;
+    private List deleteQueries;
+    List documents;
+    RAMDirectory ramDirectory;
+    private IndexWriter writer;
+    RealtimeIndex ri;
+    boolean useRamDir;
+
+    private Transaction(RealtimeIndex ri, boolean useRamDir) {
+      this.ri = ri;
+      this.useRamDir = useRamDir;
+    }
+    
+    /**
+     * Adds 
+     * @param document
+     * @param analyzer
+     * @throws IOException
+     */
+    public void addDocument(Document document, Analyzer analyzer)
+        throws IOException {
+      if (useRamDir) {
+        if (ramDirectory == null) {
+          ramDirectory = new RAMDirectory();
+          writer = new IndexWriter(ramDirectory, ri.defaultAnalyzer, true,
+              MaxFieldLength.UNLIMITED);
+        }
+        writer.addDocument(document, analyzer);
+      } else {
+        if (documents == null) {
+          documents = new ArrayList();
+        }
+        documents.add(new DocumentAnalyzer(document, analyzer));
+      }
+    }
+    
+    /**
+     * Commit the transaction to the ram index
+     * @return
+     * @throws IOException
+     */
+    public synchronized IndexReader commit() throws IOException {
+      if (useRamDir) {
+        writer.commit();
+        int writerNumDocs = writer.numDocs();
+        writer.close();
+      }
+      return ri.commit(this);
+    }
+    
+    /**
+     * Gets the delete terms
+     * @return
+     */
+    public Term[] getDeleteTerms() {
+      if (deleteTerms == null)
+        return new Term[0];
+      return (Term[]) deleteTerms.toArray(new Term[0]);
+    }
+    
+    /**
+     * Gets the delete by queries
+     * @return
+     */
+    public Query[] getDeleteQueries() {
+      if (deleteQueries == null)
+        return new Query[0];
+      return (Query[]) deleteQueries.toArray(new Query[0]);
+    }
+
+    public void addDeleteTerm(Term term) {
+      if (deleteTerms == null)
+        deleteTerms = new ArrayList();
+      deleteTerms.add(term);
+    }
+
+    public void addDeleteQuery(Query query) {
+      if (deleteQueries == null)
+        deleteQueries = new ArrayList();
+      deleteQueries.add(query);
+    }
+  }
+  
+  /**
+   * The reader needs to be closed otherwise references will be left open.
+   */
+  public synchronized IndexReader getReader() throws Exception {
+    IndexReader diskReader = diskWriter.getReader();
+    IndexReader ramReader = ramWriter.getReader();
+    return new MultiReader(new IndexReader[] { diskReader, ramReader });
+  }
+}

Property changes on: src/java/org/apache/lucene/realtime/RealtimeIndex.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:keywords
   + "Date Rev Author URL Id"
Name: svn:eol-style
   + native

