Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 930900) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -4909,4 +4909,47 @@ dir.close(); } } + + private static class FlushCountingIndexWriter extends IndexWriter { + int flushCount; + public FlushCountingIndexWriter(Directory dir, IndexWriterConfig iwc) throws IOException { + super(dir, iwc); + } + public void doAfterFlush() { + flushCount++; + } + } + + public void testIndexingThenDeleting() throws Exception { + final Random r = newRandom(); + + Directory dir = new MockRAMDirectory(); + FlushCountingIndexWriter w = new FlushCountingIndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT)).setRAMBufferSizeMB(0.5)); + //w.setInfoStream(System.out); + Document doc = new Document(); + doc.add(new Field("field", "go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", Field.Store.NO, Field.Index.ANALYZED)); + for(int iter=0;iter<6;iter++) { + int count = 0; + + final boolean doIndexing = r.nextBoolean(); + if (doIndexing) { + // Add docs until a flush is triggered + final int startFlushCount = w.flushCount; + while(w.flushCount == startFlushCount) { + w.addDocument(doc); + count++; + } + } else { + // Delete docs until a flush is triggered + final int startFlushCount = w.flushCount; + while(w.flushCount == startFlushCount) { + w.deleteDocuments(new Term("foo", ""+count)); + count++; + } + } + assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 2500); + } + w.close(); + dir.close(); + } } Index: src/java/org/apache/lucene/index/TermsHashPerField.java =================================================================== --- src/java/org/apache/lucene/index/TermsHashPerField.java (revision 930900) +++ src/java/org/apache/lucene/index/TermsHashPerField.java (working copy) @@ -71,7 +71,7 @@ fieldState = docInverterPerField.fieldState; this.consumer = perThread.consumer.addField(this, fieldInfo); - postingsArray = consumer.createPostingsArray(postingsHashSize/2); + initPostingsArray(); bytesUsed(postingsArray.size * postingsArray.bytesPerPosting()); streamCount = consumer.getStreamCount(); @@ -84,6 +84,10 @@ nextPerField = null; } + private void initPostingsArray() { + postingsArray = consumer.createPostingsArray(2); + } + // sugar: just forwards to DW private void bytesUsed(long size) { if (perThread.termsHash.trackAllocations) { @@ -111,10 +115,10 @@ postingsHashMask = newSize-1; } + // Fully free the postings array on each flush: if (postingsArray != null) { - final int startSize = postingsArray.size; - postingsArray = postingsArray.shrink(targetSize, false); - bytesUsed(postingsArray.bytesPerPosting() * (postingsArray.size - startSize)); + bytesUsed(-postingsArray.bytesPerPosting() * postingsArray.size); + postingsArray = null; } } @@ -309,6 +313,10 @@ @Override boolean start(Fieldable[] fields, int count) throws IOException { doCall = consumer.start(fields, count); + if (postingsArray == null) { + initPostingsArray(); + } + if (nextPerField != null) doNextCall = nextPerField.start(fields, count); return doCall || doNextCall; Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 930900) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -689,6 +689,7 @@ } synchronized void clearFlushPending() { + bufferIsFull = false; flushPending = false; } @@ -913,29 +914,37 @@ throw new AlreadyClosedException("this IndexWriter is closed"); } - synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { - waitReady(null); - for (int i = 0; i < terms.length; i++) - addDeleteTerm(terms[i], numDocsInRAM); + boolean bufferDeleteTerms(Term[] terms) throws IOException { + synchronized(this) { + waitReady(null); + for (int i = 0; i < terms.length; i++) + addDeleteTerm(terms[i], numDocsInRAM); + } return timeToFlushDeletes(); } - synchronized boolean bufferDeleteTerm(Term term) throws IOException { - waitReady(null); - addDeleteTerm(term, numDocsInRAM); + boolean bufferDeleteTerm(Term term) throws IOException { + synchronized(this) { + waitReady(null); + addDeleteTerm(term, numDocsInRAM); + } return timeToFlushDeletes(); } - synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException { - waitReady(null); - for (int i = 0; i < queries.length; i++) - addDeleteQuery(queries[i], numDocsInRAM); + boolean bufferDeleteQueries(Query[] queries) throws IOException { + synchronized(this) { + waitReady(null); + for (int i = 0; i < queries.length; i++) + addDeleteQuery(queries[i], numDocsInRAM); + } return timeToFlushDeletes(); } - synchronized boolean bufferDeleteQuery(Query query) throws IOException { - waitReady(null); - addDeleteQuery(query, numDocsInRAM); + boolean bufferDeleteQuery(Query query) throws IOException { + synchronized(this) { + waitReady(null); + addDeleteQuery(query, numDocsInRAM); + } return timeToFlushDeletes(); } @@ -948,7 +957,7 @@ synchronized boolean doApplyDeletes() { // Very similar to deletesFull(), except we don't count - // numBytesAlloc, because we are checking whether + // numBytesUsed, because we are checking whether // deletes (alone) are consuming too many resources now // and thus should be applied. We apply deletes if RAM // usage is > 1/2 of our allowed RAM buffer, to prevent @@ -961,8 +970,11 @@ ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); } - synchronized private boolean timeToFlushDeletes() { - return (bufferIsFull || deletesFull()) && setFlushPending(); + private boolean timeToFlushDeletes() { + balanceRAM(); + synchronized(this) { + return (bufferIsFull || deletesFull()) && setFlushPending(); + } } void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { @@ -1156,18 +1168,13 @@ deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY); } - synchronized boolean doBalanceRAM() { - return ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed+deletesInRAM.bytesUsed+deletesFlushed.bytesUsed >= ramBufferSize); - } - /** Does the synchronized work to finish/flush the * inverted document. */ private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException { - if (doBalanceRAM()) - // Must call this w/o holding synchronized(this) else - // we'll hit deadlock: - balanceRAM(); + // Must call this w/o holding synchronized(this) else + // we'll hit deadlock: + balanceRAM(); synchronized(this) { @@ -1381,10 +1388,20 @@ * which balances the pools to match the current docs. */ void balanceRAM() { - final long deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed; + final boolean doBalance; + final long deletesRAMUsed; - if (numBytesUsed+deletesRAMUsed > ramBufferSize) { + synchronized(this) { + if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) { + return; + } + + deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed; + doBalance = numBytesUsed+deletesRAMUsed >= ramBufferSize; + } + if (doBalance) { + if (infoStream != null) message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) + " vs trigger=" + toMB(ramBufferSize) + Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 930900) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -3796,6 +3796,7 @@ // never hit return false; } finally { + docWriter.clearFlushPending(); docWriter.resumeAllThreads(); } } @@ -4560,6 +4561,9 @@ // Apply buffered deletes to all segments. private final synchronized boolean applyDeletes() throws CorruptIndexException, IOException { assert testPoint("startApplyDeletes"); + if (infoStream != null) { + message("applyDeletes"); + } flushDeletesCount++; SegmentInfos rollback = (SegmentInfos) segmentInfos.clone(); boolean success = false;