Index: src/java/org/apache/lucene/index/TermsHashConsumerPerField.java =================================================================== --- src/java/org/apache/lucene/index/TermsHashConsumerPerField.java (revision 930107) +++ src/java/org/apache/lucene/index/TermsHashConsumerPerField.java (working copy) @@ -34,8 +34,6 @@ abstract void newTerm(int termID) throws IOException; abstract void addTerm(int termID) throws IOException; abstract int getStreamCount(); - - abstract ParallelPostingsArray createPostingsArray(int size); - abstract int bytesPerPosting(); + abstract ParallelPostingsArray createPostingsArray(int size); } Index: src/java/org/apache/lucene/index/TermsHashPerField.java =================================================================== --- src/java/org/apache/lucene/index/TermsHashPerField.java (revision 930107) +++ src/java/org/apache/lucene/index/TermsHashPerField.java (working copy) @@ -25,6 +25,7 @@ import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute; import org.apache.lucene.document.Fieldable; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; final class TermsHashPerField extends InvertedDocConsumerPerField { @@ -57,18 +58,21 @@ private final BytesRef utf8; private Comparator termComp; - private final int bytesPerPosting; - public TermsHashPerField(DocInverterPerField docInverterPerField, final TermsHashPerThread perThread, final TermsHashPerThread nextPerThread, final FieldInfo fieldInfo) { this.perThread = perThread; intPool = perThread.intPool; bytePool = perThread.bytePool; termBytePool = perThread.termBytePool; docState = perThread.docState; + postingsHash = new int[postingsHashSize]; Arrays.fill(postingsHash, -1); + bytesUsed(postingsHashSize * RamUsageEstimator.NUM_BYTES_INT); + fieldState = docInverterPerField.fieldState; this.consumer = perThread.consumer.addField(this, fieldInfo); + postingsArray = consumer.createPostingsArray(postingsHashSize/2); + bytesUsed(postingsArray.size * postingsArray.bytesPerPosting()); streamCount = consumer.getStreamCount(); numPostingInt = 2*streamCount; @@ -78,23 +82,15 @@ nextPerField = (TermsHashPerField) nextPerThread.addField(docInverterPerField, fieldInfo); else nextPerField = null; - - // +3: Posting is referenced by hash, which - // targets 25-50% fill factor; approximate this - // as 3X # pointers - bytesPerPosting = consumer.bytesPerPosting() + 3*DocumentsWriter.INT_NUM_BYTE; } - - void initPostingsArray() { - assert postingsArray == null; - postingsArray = consumer.createPostingsArray(postingsHashSize); - + // sugar: just forwards to DW + private void bytesUsed(long size) { if (perThread.termsHash.trackAllocations) { - perThread.termsHash.docWriter.bytesAllocated(bytesPerPosting * postingsHashSize); + perThread.termsHash.docWriter.bytesUsed(size); } } - + void shrinkHash(int targetSize) { assert postingsCompacted || numPostings == 0; @@ -106,13 +102,20 @@ } if (newSize != postingsHash.length) { + final long previousSize = postingsHash.length; postingsHash = new int[newSize]; + bytesUsed((newSize-previousSize)*RamUsageEstimator.NUM_BYTES_INT); Arrays.fill(postingsHash, -1); - postingsArray = null; postingsHashSize = newSize; postingsHashHalfSize = newSize/2; postingsHashMask = newSize-1; } + + if (postingsArray != null) { + final int startSize = postingsArray.size; + postingsArray = postingsArray.shrink(targetSize, false); + bytesUsed(postingsArray.bytesPerPosting() * (postingsArray.size - startSize)); + } } public void reset() { @@ -135,14 +138,10 @@ nextPerField.abort(); } - private void growParallelPostingsArray() { - int oldSize = postingsArray.byteStarts.length; - int newSize = (int) (oldSize * 1.5); - this.postingsArray = this.postingsArray.resize(newSize); - - if (perThread.termsHash.trackAllocations) { - perThread.termsHash.docWriter.bytesAllocated(bytesPerPosting * (newSize - oldSize)); - } + private final void growParallelPostingsArray() { + int oldSize = postingsArray.size; + this.postingsArray = this.postingsArray.grow(); + bytesUsed(postingsArray.bytesPerPosting() * (postingsArray.size - oldSize)); } public void initReader(ByteSliceReader reader, int termID, int stream) { @@ -301,9 +300,6 @@ } else { throw new IllegalArgumentException("Could not find a term attribute (that implements TermToBytesRefAttribute) in the TokenStream"); } - if (postingsArray == null) { - initPostingsArray(); - } consumer.start(f); if (nextPerField != null) { nextPerField.start(f); @@ -349,12 +345,9 @@ // New posting termID = numPostings++; - if (termID >= postingsArray.textStarts.length) { + if (termID >= postingsArray.size) { growParallelPostingsArray(); } - if (perThread.termsHash.trackAllocations) { - perThread.termsHash.docWriter.bytesUsed(bytesPerPosting); - } assert termID >= 0; @@ -455,12 +448,9 @@ // New posting termID = numPostings++; - if (termID >= postingsArray.textStarts.length) { + if (termID >= postingsArray.size) { growParallelPostingsArray(); } - if (perThread.termsHash.trackAllocations) { - perThread.termsHash.docWriter.bytesUsed(bytesPerPosting); - } assert termID != -1; assert postingsHash[hashPos] == -1; @@ -492,6 +482,7 @@ if (numPostings == postingsHashHalfSize) { rehashPostings(2*postingsHashSize); + bytesUsed(2*numPostings * RamUsageEstimator.NUM_BYTES_INT); } // Init stream slices @@ -621,6 +612,7 @@ postingsHashMask = newMask; postingsHash = newHash; + postingsHashSize = newSize; postingsHashHalfSize = newSize >> 1; } Index: src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java =================================================================== --- src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (revision 930107) +++ src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (working copy) @@ -185,29 +185,30 @@ int lastDocIDs[]; // Last docID where this term occurred int lastDocCodes[]; // Code for prior doc int lastPositions[]; // Last position where this term occurred - + @Override - ParallelPostingsArray resize(int newSize) { - FreqProxPostingsArray newArray = new FreqProxPostingsArray(newSize); - copy(this, newArray); - return newArray; + ParallelPostingsArray newInstance(int size) { + return new FreqProxPostingsArray(size); } - - void copy(FreqProxPostingsArray fromArray, FreqProxPostingsArray toArray) { - super.copy(fromArray, toArray); - System.arraycopy(fromArray.docFreqs, 0, toArray.docFreqs, 0, fromArray.docFreqs.length); - System.arraycopy(fromArray.lastDocIDs, 0, toArray.lastDocIDs, 0, fromArray.lastDocIDs.length); - System.arraycopy(fromArray.lastDocCodes, 0, toArray.lastDocCodes, 0, fromArray.lastDocCodes.length); - System.arraycopy(fromArray.lastPositions, 0, toArray.lastPositions, 0, fromArray.lastPositions.length); + + void copyTo(ParallelPostingsArray toArray, int numToCopy) { + assert toArray instanceof FreqProxPostingsArray; + FreqProxPostingsArray to = (FreqProxPostingsArray) toArray; + + super.copyTo(toArray, numToCopy); + + System.arraycopy(docFreqs, 0, to.docFreqs, 0, numToCopy); + System.arraycopy(lastDocIDs, 0, to.lastDocIDs, 0, numToCopy); + System.arraycopy(lastDocCodes, 0, to.lastDocCodes, 0, numToCopy); + System.arraycopy(lastPositions, 0, to.lastPositions, 0, numToCopy); } - + + @Override + int bytesPerPosting() { + return ParallelPostingsArray.BYTES_PER_POSTING + 4 * DocumentsWriter.INT_NUM_BYTE; + } } - @Override - int bytesPerPosting() { - return ParallelPostingsArray.BYTES_PER_POSTING + 4 * DocumentsWriter.INT_NUM_BYTE; - } - public void abort() {} } Index: src/java/org/apache/lucene/index/ParallelPostingsArray.java =================================================================== --- src/java/org/apache/lucene/index/ParallelPostingsArray.java (revision 930107) +++ src/java/org/apache/lucene/index/ParallelPostingsArray.java (working copy) @@ -1,5 +1,7 @@ package org.apache.lucene.index; +import org.apache.lucene.util.ArrayUtil; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -21,25 +23,49 @@ class ParallelPostingsArray { final static int BYTES_PER_POSTING = 3 * DocumentsWriter.INT_NUM_BYTE; + final int size; final int[] textStarts; final int[] intStarts; final int[] byteStarts; - - public ParallelPostingsArray(final int size) { + + ParallelPostingsArray(final int size) { + this.size = size; textStarts = new int[size]; intStarts = new int[size]; byteStarts = new int[size]; } - - ParallelPostingsArray resize(int newSize) { - ParallelPostingsArray newArray = new ParallelPostingsArray(newSize); - copy(this, newArray); + + int bytesPerPosting() { + return BYTES_PER_POSTING; + } + + ParallelPostingsArray newInstance(int size) { + return new ParallelPostingsArray(size); + } + + final ParallelPostingsArray grow() { + int newSize = ArrayUtil.oversize(size + 1, bytesPerPosting()); + ParallelPostingsArray newArray = newInstance(newSize); + copyTo(newArray, size); return newArray; } - - void copy(ParallelPostingsArray fromArray, ParallelPostingsArray toArray) { - System.arraycopy(fromArray.textStarts, 0, toArray.textStarts, 0, fromArray.textStarts.length); - System.arraycopy(fromArray.intStarts, 0, toArray.intStarts, 0, fromArray.intStarts.length); - System.arraycopy(fromArray.byteStarts, 0, toArray.byteStarts, 0, fromArray.byteStarts.length); + + final ParallelPostingsArray shrink(int targetSize, boolean doCopy) { + int shrinkSize = ArrayUtil.getShrinkSize(size, targetSize, bytesPerPosting()); + if (shrinkSize != size) { + ParallelPostingsArray newArray = newInstance(targetSize); + if (doCopy) { + copyTo(newArray, targetSize); + } + return newArray; + } else { + return this; + } } + + void copyTo(ParallelPostingsArray toArray, int numToCopy) { + System.arraycopy(textStarts, 0, toArray.textStarts, 0, numToCopy); + System.arraycopy(intStarts, 0, toArray.intStarts, 0, numToCopy); + System.arraycopy(byteStarts, 0, toArray.byteStarts, 0, numToCopy); + } } Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 930107) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -285,7 +285,6 @@ // If we've allocated 5% over our RAM budget, we then // free down to 95% - private long freeTrigger = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05); private long freeLevel = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95); // Flush @ this number of docs. If ramBufferSize is @@ -356,7 +355,6 @@ ramBufferSize = (long) (mb*1024*1024); waitQueuePauseBytes = (long) (ramBufferSize*0.1); waitQueueResumeBytes = (long) (ramBufferSize*0.05); - freeTrigger = (long) (1.05 * ramBufferSize); freeLevel = (long) (0.95 * ramBufferSize); } } @@ -553,7 +551,6 @@ flushPending = false; for(int i=0;i= ramBufferSize || numBytesAlloc >= freeTrigger); + return ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed+deletesInRAM.bytesUsed+deletesFlushed.bytesUsed >= ramBufferSize); } /** Does the synchronized work to finish/flush the @@ -1243,7 +1240,6 @@ return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; } - long numBytesAlloc; long numBytesUsed; NumberFormat nf = NumberFormat.getInstance(); @@ -1303,19 +1299,16 @@ final int size = freeByteBlocks.size(); final byte[] b; if (0 == size) { + b = new byte[blockSize]; // Always record a block allocated, even if // trackAllocations is false. This is necessary // because this block will be shared between // things that don't track allocations (term // vectors) and things that do (freq/prox // postings). - numBytesAlloc += blockSize; - b = new byte[blockSize]; + numBytesUsed += blockSize; } else b = freeByteBlocks.remove(size-1); - if (trackAllocations) - numBytesUsed += blockSize; - assert numBytesUsed <= numBytesAlloc; return b; } } @@ -1343,30 +1336,21 @@ final int size = freeIntBlocks.size(); final int[] b; if (0 == size) { + b = new int[INT_BLOCK_SIZE]; // Always record a block allocated, even if // trackAllocations is false. This is necessary // because this block will be shared between // things that don't track allocations (term // vectors) and things that do (freq/prox // postings). - numBytesAlloc += INT_BLOCK_SIZE*INT_NUM_BYTE; - b = new int[INT_BLOCK_SIZE]; + numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE; } else b = freeIntBlocks.remove(size-1); - if (trackAllocations) - numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE; - assert numBytesUsed <= numBytesAlloc; return b; } - synchronized void bytesAllocated(long numBytes) { - numBytesAlloc += numBytes; - assert numBytesUsed <= numBytesAlloc; - } - synchronized void bytesUsed(long numBytes) { numBytesUsed += numBytes; - assert numBytesUsed <= numBytesAlloc; } /* Return int[]s to the pool */ @@ -1397,23 +1381,18 @@ * which balances the pools to match the current docs. */ void balanceRAM() { - // We flush when we've used our target usage - final long flushTrigger = ramBufferSize; - final long deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed; - if (numBytesAlloc+deletesRAMUsed > freeTrigger) { + if (numBytesUsed+deletesRAMUsed > ramBufferSize) { if (infoStream != null) message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) + - " vs trigger=" + toMB(flushTrigger) + - " allocMB=" + toMB(numBytesAlloc) + + " vs trigger=" + toMB(ramBufferSize) + " deletesMB=" + toMB(deletesRAMUsed) + - " vs trigger=" + toMB(freeTrigger) + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE)); - final long startBytesAlloc = numBytesAlloc + deletesRAMUsed; + final long startBytesUsed = numBytesUsed + deletesRAMUsed; int iter = 0; @@ -1423,39 +1402,38 @@ boolean any = true; - while(numBytesAlloc+deletesRAMUsed > freeLevel) { + while(numBytesUsed+deletesRAMUsed > freeLevel) { synchronized(this) { if (0 == perDocAllocator.freeByteBlocks.size() && 0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeIntBlocks.size() && !any) { // Nothing else to free -- must flush now. - bufferIsFull = numBytesUsed+deletesRAMUsed > flushTrigger; + bufferIsFull = numBytesUsed+deletesRAMUsed > ramBufferSize; if (infoStream != null) { - if (numBytesUsed > flushTrigger) + if (numBytesUsed+deletesRAMUsed > ramBufferSize) message(" nothing to free; now set bufferIsFull"); else message(" nothing to free"); } - assert numBytesUsed <= numBytesAlloc; break; } if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); - numBytesAlloc -= BYTE_BLOCK_SIZE; + numBytesUsed -= BYTE_BLOCK_SIZE; } if ((1 == iter % 4) && freeIntBlocks.size() > 0) { freeIntBlocks.remove(freeIntBlocks.size()-1); - numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; + numBytesUsed -= INT_BLOCK_SIZE * INT_NUM_BYTE; } if ((2 == iter % 4) && perDocAllocator.freeByteBlocks.size() > 0) { // Remove upwards of 32 blocks (each block is 1K) for (int i = 0; i < 32; ++i) { perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1); - numBytesAlloc -= PER_DOC_BLOCK_SIZE; + numBytesUsed -= PER_DOC_BLOCK_SIZE; if (perDocAllocator.freeByteBlocks.size() == 0) { break; } @@ -1471,26 +1449,7 @@ } if (infoStream != null) - message(" after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((numBytesUsed+deletesRAMUsed)/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.)); - - } else { - // If we have not crossed the 100% mark, but have - // crossed the 95% mark of RAM we are actually - // using, go ahead and flush. This prevents - // over-allocating and then freeing, with every - // flush. - synchronized(this) { - - if (numBytesUsed+deletesRAMUsed > flushTrigger) { - if (infoStream != null) - message(" RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) + - " allocMB=" + nf.format(numBytesAlloc/1024./1024.) + - " deletesMB=" + nf.format(deletesRAMUsed/1024./1024.) + - " triggerMB=" + nf.format(flushTrigger/1024./1024.)); - - bufferIsFull = true; - } - } + message(" after free: freedMB=" + nf.format((startBytesUsed-numBytesUsed-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((numBytesUsed+deletesRAMUsed)/1024./1024.)); } } Index: src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java =================================================================== --- src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java (revision 930107) +++ src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java (working copy) @@ -271,23 +271,25 @@ int[] lastOffsets; // Last offset we saw int[] lastPositions; // Last position where this term occurred + ParallelPostingsArray newInstance(int size) { + return new TermVectorsPostingsArray(size); + } + @Override - ParallelPostingsArray resize(int newSize) { - TermVectorsPostingsArray newArray = new TermVectorsPostingsArray(newSize); - copy(this, newArray); - return newArray; + void copyTo(ParallelPostingsArray toArray, int numToCopy) { + assert toArray instanceof TermVectorsPostingsArray; + TermVectorsPostingsArray to = (TermVectorsPostingsArray) toArray; + + super.copyTo(toArray, numToCopy); + + System.arraycopy(freqs, 0, to.freqs, 0, size); + System.arraycopy(lastOffsets, 0, to.lastOffsets, 0, size); + System.arraycopy(lastPositions, 0, to.lastPositions, 0, size); } - - void copy(TermVectorsPostingsArray fromArray, TermVectorsPostingsArray toArray) { - super.copy(fromArray, toArray); - System.arraycopy(fromArray.freqs, 0, toArray.freqs, 0, fromArray.freqs.length); - System.arraycopy(fromArray.lastOffsets, 0, toArray.lastOffsets, 0, fromArray.lastOffsets.length); - System.arraycopy(fromArray.lastPositions, 0, toArray.lastPositions, 0, fromArray.lastPositions.length); + + @Override + int bytesPerPosting() { + return super.bytesPerPosting() + 3 * DocumentsWriter.INT_NUM_BYTE; } } - - @Override - int bytesPerPosting() { - return ParallelPostingsArray.BYTES_PER_POSTING + 3 * DocumentsWriter.INT_NUM_BYTE; - } }