diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 88983b9..5df22b0 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -61,6 +61,9 @@ New Features
   also finds completions allowing for fuzzy edits in the input string.
   (Robert Muir, Simon Willnauer, Mike McCandless)
 
+* LUCENE-4515: MemoryIndex now supports adding the same field multiple
+  times. (Simon Willnauer)
+
 API Changes
 
 * LUCENE-4399: Deprecated AppendingCodec. Lucene's term dictionaries
@@ -148,6 +151,10 @@ Optimizations
   Instead of writing a file pointer to a VIntBlock containing the doc id, just 
   write the doc id.  (Mike McCandless, Robert Muir)
 
+* LUCENE-4515: MemoryIndex now uses Byte/IntBlockPool internally to hold terms and 
+  posting lists. All index data is represented as consecutive byte/int arrays to 
+  reduce GC cost and memory overhead. (Simon Willnauer) 
+
 Build
 
 * Upgrade randomized testing to version 2.0.4: avoid hangs on shutdown
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/FixedStraightBytesImpl.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/FixedStraightBytesImpl.java
index 0ba456a..8b630e9 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/FixedStraightBytesImpl.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/FixedStraightBytesImpl.java
@@ -116,7 +116,7 @@ class FixedStraightBytesImpl {
     }
     
     protected void resetPool() {
-      pool.dropBuffersAndReset();
+      pool.reset(false, false);
     }
     
     protected void writeData(IndexOutput out) throws IOException {
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/VarStraightBytesImpl.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/VarStraightBytesImpl.java
index 7efe24c..9a23cbe 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/VarStraightBytesImpl.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene40/values/VarStraightBytesImpl.java
@@ -188,7 +188,7 @@ class VarStraightBytesImpl {
         } else {
           IOUtils.closeWhileHandlingException(datOut);
         }
-        pool.dropBuffersAndReset();
+        pool.reset(false, false);
       }
 
       success = false;
diff --git a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
index dd20ca5..d64b378 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
@@ -54,7 +54,7 @@ final class ByteSliceReader extends DataInput {
     buffer = pool.buffers[bufferUpto];
     upto = startIndex & ByteBlockPool.BYTE_BLOCK_MASK;
 
-    final int firstSize = ByteBlockPool.levelSizeArray[0];
+    final int firstSize = ByteBlockPool.LEVEL_SIZE_ARRAY[0];
 
     if (startIndex+firstSize >= endIndex) {
       // There is only this one slice to read
@@ -100,8 +100,8 @@ final class ByteSliceReader extends DataInput {
     // Skip to our next slice
     final int nextIndex = ((buffer[limit]&0xff)<<24) + ((buffer[1+limit]&0xff)<<16) + ((buffer[2+limit]&0xff)<<8) + (buffer[3+limit]&0xff);
 
-    level = ByteBlockPool.nextLevelArray[level];
-    final int newSize = ByteBlockPool.levelSizeArray[level];
+    level = ByteBlockPool.NEXT_LEVEL_ARRAY[level];
+    final int newSize = ByteBlockPool.LEVEL_SIZE_ARRAY[level];
 
     bufferUpto = nextIndex / ByteBlockPool.BYTE_BLOCK_SIZE;
     bufferOffset = bufferUpto * ByteBlockPool.BYTE_BLOCK_SIZE;
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java b/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java
index 302c63d..eab9e32 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java
@@ -59,7 +59,6 @@ final class DocFieldProcessor extends DocConsumer {
   int hashMask = 1;
   int totalFieldCount;
 
-  float docBoost;
   int fieldGen;
   final DocumentsWriterPerThread.DocState docState;
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 705a42c..f2e8b6f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -186,6 +186,7 @@ class DocumentsWriterPerThread {
   DeleteSlice deleteSlice;
   private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
   final Allocator byteBlockAllocator;
+  final IntBlockPool.Allocator intBlockAllocator;
 
   
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
@@ -201,9 +202,12 @@ class DocumentsWriterPerThread {
     this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
     bytesUsed = Counter.newCounter();
     byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
-    consumer = indexingChain.getChain(this);
     pendingDeletes = new BufferedDeletes();
+    intBlockAllocator = new IntBlockAllocator(bytesUsed);
     initialize();
+    // this should be the last call in the ctor 
+    // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
+    consumer = indexingChain.getChain(this);
   }
   
   public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) {
@@ -619,23 +623,28 @@ class DocumentsWriterPerThread {
    * getTerms/getTermsIndex requires <= 32768 */
   final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
 
-  /* Initial chunks size of the shared int[] blocks used to
-     store postings data */
-  final static int INT_BLOCK_SHIFT = 13;
-  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
-  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
-  /* Allocate another int[] from the shared pool */
-  int[] getIntBlock() {
-    int[] b = new int[INT_BLOCK_SIZE];
-    bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
-    return b;
-  }
-  
-  void recycleIntBlocks(int[][] blocks, int offset, int length) {
-    bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
-  }
 
+  private static class IntBlockAllocator extends IntBlockPool.Allocator {
+    private final Counter bytesUsed;
+    
+    public IntBlockAllocator(Counter bytesUsed) {
+      super(IntBlockPool.INT_BLOCK_SIZE);
+      this.bytesUsed = bytesUsed;
+    }
+    
+    /* Allocate another int[] from the shared pool */
+    public int[] getIntBlock() {
+      int[] b = new int[IntBlockPool.INT_BLOCK_SIZE];
+      bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE
+          * RamUsageEstimator.NUM_BYTES_INT);
+      return b;
+    }
+    
+    public void recycleIntBlocks(int[][] blocks, int offset, int length) {
+      bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT)));
+    }
+    
+  }
   PerDocWriteState newPerDocWriteState(String segmentSuffix) {
     assert segmentInfo != null;
     return new PerDocWriteState(infoStream, directory, segmentInfo, bytesUsed, segmentSuffix, IOContext.DEFAULT);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IntBlockPool.java b/lucene/core/src/java/org/apache/lucene/index/IntBlockPool.java
index 531b287..25b3182 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IntBlockPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IntBlockPool.java
@@ -1,7 +1,5 @@
 package org.apache.lucene.index;
 
-import java.util.Arrays;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,47 +17,365 @@ import java.util.Arrays;
  * limitations under the License.
  */
 
-final class IntBlockPool {
+import java.util.Arrays;
 
-  public int[][] buffers = new int[10][];
 
-  int bufferUpto = -1;                        // Which buffer we are upto
-  public int intUpto = DocumentsWriterPerThread.INT_BLOCK_SIZE;             // Where we are in head buffer
+/**
+ * @lucene.internal
+ */
+public final class IntBlockPool {
+  //nocommit move to o.a.l.utils
+  public final static int INT_BLOCK_SHIFT = 13;
+  public final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
+  public final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+  
+  /** Abstract class for allocating and freeing int
+   *  blocks. */
+  public abstract static class Allocator {
+    protected final int blockSize;
 
-  public int[] buffer;                              // Current head buffer
-  public int intOffset = -DocumentsWriterPerThread.INT_BLOCK_SIZE;          // Current head offset
+    public Allocator(int blockSize) {
+      this.blockSize = blockSize;
+    }
 
-  final private DocumentsWriterPerThread docWriter;
+    public abstract void recycleIntBlocks(int[][] blocks, int start, int end);
 
-  public IntBlockPool(DocumentsWriterPerThread docWriter) {
-    this.docWriter = docWriter;
+    public int[] getIntBlock() {
+      return new int[blockSize];
+    }
   }
+  
+  /** A simple {@link Allocator} that never recycles. */
+  public static final class DirectAllocator extends Allocator {
+
+    /**
+     * Creates a new {@link DirectAllocator} with a default block size
+     */
+    public DirectAllocator() {
+      super(INT_BLOCK_SIZE);
+    }
+
+    @Override
+    public void recycleIntBlocks(int[][] blocks, int start, int end) {
+    }
+  }
+  
+  /** array of buffers currently used in the pool. Buffers are allocated if needed don't modify this outside of this class */
+  public int[][] buffers = new int[10][];
 
+  /** index into the buffers array pointing to the current buffer used as the head */
+  private int bufferUpto = -1;   
+  /** Pointer to the current position in head buffer */
+  public int intUpto = INT_BLOCK_SIZE;
+  /** Current head buffer */
+  public int[] buffer;
+  /** Current head offset */
+  public int intOffset = -INT_BLOCK_SIZE;
+
+  private final Allocator allocator;
+
+  /**
+   * Creates a new {@link IntBlockPool} with a default {@link Allocator}.
+   * @see IntBlockPool#nextBuffer()
+   */
+  public IntBlockPool() {
+    this(new DirectAllocator());
+  }
+  
+  /**
+   * Creates a new {@link IntBlockPool} with the given {@link Allocator}.
+   * @see IntBlockPool#nextBuffer()
+   */
+  public IntBlockPool(Allocator allocator) {
+    this.allocator = allocator;
+  }
+  
+  /**
+   * Resets the pool to its initial state reusing the first buffer. Calling
+   * {@link IntBlockPool#nextBuffer()} is not needed after reset.
+   */
   public void reset() {
+    this.reset(true, true);
+  }
+  
+  /**
+   * Expert: Resets the pool to its initial state reusing the first buffer. 
+   * @param zeroFillBuffers if <code>true</code> the buffers are filled with <tt>0</tt>. 
+   *        This should be set to <code>true</code> if this pool is used with 
+   *        {@link SliceWriter}.
+   * @param reuseFirst if <code>true</code> the first buffer will be reused and calling
+   *        {@link IntBlockPool#nextBuffer()} is not needed after reset iff the 
+   *        block pool was used before ie. {@link IntBlockPool#nextBuffer()} was called before.
+   */
+  public void reset(boolean zeroFillBuffers, boolean reuseFirst) {
     if (bufferUpto != -1) {
-      // Reuse first buffer
-      if (bufferUpto > 0) {
-        docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
-        Arrays.fill(buffers, 1, bufferUpto, null);
+      // We allocated at least one buffer
+
+      if (zeroFillBuffers) {
+        for(int i=0;i<bufferUpto;i++) {
+          // Fully zero fill buffers that we fully used
+          Arrays.fill(buffers[i], 0);
+        }
+        // Partial zero fill the final buffer
+        Arrays.fill(buffers[bufferUpto], 0, intUpto, 0);
       }
-      bufferUpto = 0;
-      intUpto = 0;
-      intOffset = 0;
-      buffer = buffers[0];
+     
+      if (bufferUpto > 0 || !reuseFirst) {
+        final int offset = reuseFirst ? 1 : 0;  
+       // Recycle all but the first buffer
+       allocator.recycleIntBlocks(buffers, offset, 1+bufferUpto);
+       Arrays.fill(buffers, offset, bufferUpto+1, null);
+     }
+     if (reuseFirst) {
+       // Re-use the first buffer
+       bufferUpto = 0;
+       intUpto = 0;
+       intOffset = 0;
+       buffer = buffers[0];
+     } else {
+       bufferUpto = -1;
+       intUpto = INT_BLOCK_SIZE;
+       intOffset = -INT_BLOCK_SIZE;
+       buffer = null;
+     }
     }
   }
-
+  
+  /**
+   * Advances the pool to its next buffer. This method should be called once
+   * after the constructor to initialize the pool. In contrast to the
+   * constructor a {@link IntBlockPool#reset()} call will advance the pool to
+   * its first buffer immediately.
+   */
   public void nextBuffer() {
     if (1+bufferUpto == buffers.length) {
       int[][] newBuffers = new int[(int) (buffers.length*1.5)][];
       System.arraycopy(buffers, 0, newBuffers, 0, buffers.length);
       buffers = newBuffers;
     }
-    buffer = buffers[1+bufferUpto] = docWriter.getIntBlock();
+    buffer = buffers[1+bufferUpto] = allocator.getIntBlock();
     bufferUpto++;
 
     intUpto = 0;
-    intOffset += DocumentsWriterPerThread.INT_BLOCK_SIZE;
+    intOffset += INT_BLOCK_SIZE;
+  }
+  
+  /**
+   * Creates a new int slice with the given starting size and returns the slices offset in the pool.
+   * @see SliceReader
+   */
+  private int newSlice(final int size) {
+    if (intUpto > INT_BLOCK_SIZE-size) {
+      nextBuffer();
+      assert assertSliceBuffer(buffer);
+    }
+      
+    final int upto = intUpto;
+    intUpto += size;
+    buffer[intUpto-1] = 1;
+    return upto;
+  }
+  
+  private static final boolean assertSliceBuffer(int[] buffer) {
+    int count = 0;
+    for (int i = 0; i < buffer.length; i++) {
+      count += buffer[i]; // for slices the buffer must only have 0 values
+    }
+    return count == 0;
+  }
+  
+  
+  // no need to make this public unless we support different sizes
+  // TODO make the levels and the sizes configurable
+  /**
+   * An array holding the offset into the {@link IntBlockPool#LEVEL_SIZE_ARRAY}
+   * to quickly navigate to the next slice level.
+   */
+  private final static int[] NEXT_LEVEL_ARRAY = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
+  
+  /**
+   * An array holding the level sizes for int slices.
+   */
+  private final static int[] LEVEL_SIZE_ARRAY = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
+  
+  /**
+   * The first level size for new slices
+   */
+  private final static int FIRST_LEVEL_SIZE = LEVEL_SIZE_ARRAY[0];
+
+  /**
+   * Allocates a new slice from the given offset
+   */
+  private int allocSlice(final int[] slice, final int sliceOffset) {
+    final int level = slice[sliceOffset];
+    final int newLevel = NEXT_LEVEL_ARRAY[level-1];
+    final int newSize = LEVEL_SIZE_ARRAY[newLevel];
+    // Maybe allocate another block
+    if (intUpto > INT_BLOCK_SIZE-newSize) {
+      nextBuffer();
+      assert assertSliceBuffer(buffer);
+    }
+
+    final int newUpto = intUpto;
+    final int offset = newUpto + intOffset;
+    intUpto += newSize;
+    // Write forwarding address at end of last slice:
+    slice[sliceOffset] = offset;
+        
+    // Write new level:
+    buffer[intUpto-1] = newLevel;
+
+    return newUpto;
+  }
+  
+  /**
+   * A {@link SliceWriter} that allows to write multiple integer slices into a given {@link IntBlockPool}.
+   * 
+   *  @see SliceReader
+   *  @lucene.internal
+   */
+  public static class SliceWriter {
+    
+    private int offset;
+    private final IntBlockPool pool;
+    
+    
+    public SliceWriter(IntBlockPool pool) {
+      this.pool = pool;
+    }
+    /**
+     * 
+     */
+    public void reset(int sliceOffset) {
+      this.offset = sliceOffset;
+    }
+    
+    /**
+     * Writes the given value into the slice and resizes the slice if needed
+     */
+    public void writeInt(int value) {
+      int[] ints = pool.buffers[offset >> INT_BLOCK_SHIFT];
+      assert ints != null;
+      int relativeOffset = offset & INT_BLOCK_MASK;
+      if (ints[relativeOffset] != 0) {
+        // End of slice; allocate a new one
+          relativeOffset = pool.allocSlice(ints, relativeOffset);
+        ints = pool.buffer;
+        offset = relativeOffset + pool.intOffset;
+      }
+      ints[relativeOffset] = value;
+      offset++; 
+    }
+    
+    /**
+     * starts a new slice and returns the start offset. The returned value
+     * should be used as the start offset to initialize a {@link SliceReader}.
+     */
+    public int startNewSlice() {
+      return offset = pool.newSlice(FIRST_LEVEL_SIZE) + pool.intOffset;
+      
+    }
+    
+    /**
+     * Returns the offset of the currently written slice. The returned value
+     * should be used as the end offset to initialize a {@link SliceReader} once
+     * this slice is fully written or to reset the this writer if another slice
+     * needs to be written.
+     */
+    public int getCurrentOffset() {
+      return offset;
+    }
+  }
+  
+  /**
+   * A {@link SliceReader} that can read int slices written by a {@link SliceWriter}
+   * @lucene.internal
+   */
+  public static final class SliceReader {
+    
+    private final IntBlockPool pool;
+    private int upto;
+    private int bufferUpto;
+    private int bufferOffset;
+    private int[] buffer;
+    private int limit;
+    private int level;
+    private int end;
+    
+    /**
+     * Creates a new {@link SliceReader} on the given pool
+     */
+    public SliceReader(IntBlockPool pool) {
+      this.pool = pool;
+    }
+
+    /**
+     * Resets the reader to a slice give the slices absolute start and end offset in the pool
+     */
+    public void reset(int startOffset, int endOffset) {
+      bufferUpto = startOffset / INT_BLOCK_SIZE;
+      bufferOffset = bufferUpto * INT_BLOCK_SIZE;
+      this.end = endOffset;
+      upto = startOffset;
+      level = 1;
+      
+      buffer = pool.buffers[bufferUpto];
+      upto = startOffset & INT_BLOCK_MASK;
+
+      final int firstSize = IntBlockPool.LEVEL_SIZE_ARRAY[0];
+      if (startOffset+firstSize >= endOffset) {
+        // There is only this one slice to read
+        limit = endOffset & INT_BLOCK_MASK;
+      } else {
+        limit = upto+firstSize-1;
+      }
+
+    }
+    
+    /**
+     * Returns <code>true</code> iff the current slice is fully read. If this
+     * method returns <code>true</code> {@link SliceReader#readInt()} should not
+     * be called again on this slice.
+     */
+    public boolean endOfSlice() {
+      assert upto + bufferOffset <= end;
+      return upto + bufferOffset == end;
+    }
+    
+    /**
+     * Reads the next int from the current slice and returns it.
+     * @see SliceReader#endOfSlice()
+     */
+    public int readInt() {
+      assert !endOfSlice();
+      assert upto <= limit;
+      if (upto == limit)
+        nextSlice();
+      return buffer[upto++];
+    }
+    
+    private void nextSlice() {
+      // Skip to our next slice
+      final int nextIndex = buffer[limit];
+      level = NEXT_LEVEL_ARRAY[level-1];
+      final int newSize = LEVEL_SIZE_ARRAY[level];
+
+      bufferUpto = nextIndex / INT_BLOCK_SIZE;
+      bufferOffset = bufferUpto * INT_BLOCK_SIZE;
+
+      buffer = pool.buffers[bufferUpto];
+      upto = nextIndex & INT_BLOCK_MASK;
+
+      if (nextIndex + newSize >= end) {
+        // We are advancing to the final slice
+        assert end - nextIndex > 0;
+        limit = end - bufferOffset;
+      } else {
+        // This is not the final slice (subtract 4 for the
+        // forwarding address at the end of this new slice)
+        limit = upto+newSize-1;
+      }
+    }
   }
 }
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/TermsHash.java b/lucene/core/src/java/org/apache/lucene/index/TermsHash.java
index 090540d..c4e1925 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TermsHash.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TermsHash.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.Counter;
 
 /** This class implements {@link InvertedDocConsumer}, which
  *  is passed each token produced by the analyzer on each
@@ -36,11 +37,11 @@ final class TermsHash extends InvertedDocConsumer {
 
   final TermsHashConsumer consumer;
   final TermsHash nextTermsHash;
-  final DocumentsWriterPerThread docWriter;
 
   final IntBlockPool intPool;
   final ByteBlockPool bytePool;
   ByteBlockPool termBytePool;
+  final Counter bytesUsed;
 
   final boolean primary;
   final DocumentsWriterPerThread.DocState docState;
@@ -56,11 +57,11 @@ final class TermsHash extends InvertedDocConsumer {
 
   public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, boolean trackAllocations, final TermsHash nextTermsHash) {
     this.docState = docWriter.docState;
-    this.docWriter = docWriter;
     this.consumer = consumer;
     this.trackAllocations = trackAllocations; 
     this.nextTermsHash = nextTermsHash;
-    intPool = new IntBlockPool(docWriter);
+    this.bytesUsed = trackAllocations ? docWriter.bytesUsed : Counter.newCounter();
+    intPool = new IntBlockPool(docWriter.intBlockAllocator);
     bytePool = new ByteBlockPool(docWriter.byteBlockAllocator);
 
     if (nextTermsHash != null) {
@@ -87,12 +88,9 @@ final class TermsHash extends InvertedDocConsumer {
 
   // Clear all state
   void reset() {
-    intPool.reset();
-    bytePool.reset();
-
-    if (primary) {
-      bytePool.reset();
-    }
+    // we don't reuse so we drop everything and don't fill with 0
+    intPool.reset(false, false);
+    bytePool.reset(false, false);
   }
 
   @Override
diff --git a/lucene/core/src/java/org/apache/lucene/index/TermsHashPerField.java b/lucene/core/src/java/org/apache/lucene/index/TermsHashPerField.java
index a3b56bf..7a3c6a6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TermsHashPerField.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TermsHashPerField.java
@@ -62,8 +62,7 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
     termBytePool = termsHash.termBytePool;
     docState = termsHash.docState;
     this.termsHash = termsHash;
-    bytesUsed = termsHash.trackAllocations ? termsHash.docWriter.bytesUsed
-        : Counter.newCounter();
+    bytesUsed = termsHash.bytesUsed;
     fieldState = docInverterPerField.fieldState;
     this.consumer = termsHash.consumer.addField(this, fieldInfo);
     PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
@@ -99,8 +98,8 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
   public void initReader(ByteSliceReader reader, int termID, int stream) {
     assert stream < streamCount;
     int intStart = postingsArray.intStarts[termID];
-    final int[] ints = intPool.buffers[intStart >> DocumentsWriterPerThread.INT_BLOCK_SHIFT];
-    final int upto = intStart & DocumentsWriterPerThread.INT_BLOCK_MASK;
+    final int[] ints = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
+    final int upto = intStart & IntBlockPool.INT_BLOCK_MASK;
     reader.init(bytePool,
                 postingsArray.byteStarts[termID]+stream*ByteBlockPool.FIRST_LEVEL_SIZE,
                 ints[upto+stream]);
@@ -143,7 +142,7 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
       // First time we are seeing this token since we last
       // flushed the hash.
       // Init stream slices
-      if (numPostingInt + intPool.intUpto > DocumentsWriterPerThread.INT_BLOCK_SIZE)
+      if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE)
         intPool.nextBuffer();
 
       if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
@@ -167,8 +166,8 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
     } else {
       termID = (-termID)-1;
       int intStart = postingsArray.intStarts[termID];
-      intUptos = intPool.buffers[intStart >> DocumentsWriterPerThread.INT_BLOCK_SHIFT];
-      intUptoStart = intStart & DocumentsWriterPerThread.INT_BLOCK_MASK;
+      intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
+      intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
       consumer.addTerm(termID);
     }
   }
@@ -205,7 +204,7 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
     if (termID >= 0) {// New posting
       bytesHash.byteStart(termID);
       // Init stream slices
-      if (numPostingInt + intPool.intUpto > DocumentsWriterPerThread.INT_BLOCK_SIZE) {
+      if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
         intPool.nextBuffer();
       }
 
@@ -230,8 +229,8 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
     } else {
       termID = (-termID)-1;
       final int intStart = postingsArray.intStarts[termID];
-      intUptos = intPool.buffers[intStart >> DocumentsWriterPerThread.INT_BLOCK_SHIFT];
-      intUptoStart = intStart & DocumentsWriterPerThread.INT_BLOCK_MASK;
+      intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
+      intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
       consumer.addTerm(termID);
     }
 
diff --git a/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java b/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
index 9a863ff..146885d 100644
--- a/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
+++ b/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
@@ -20,6 +20,9 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.lucene.index.IntBlockPool;
+import org.apache.lucene.index.IntBlockPool.SliceReader;
+import org.apache.lucene.index.IntBlockPool.SliceWriter;
 import org.apache.lucene.store.DataOutput;
 
 import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -113,10 +116,14 @@ public final class ByteBlockPool {
     }
   };
 
-
+  /**
+   * array of buffers currently used in the pool. Buffers are allocated if
+   * needed don't modify this outside of this class.
+   */
   public byte[][] buffers = new byte[10][];
-
-  int bufferUpto = -1;                        // Which buffer we are upto
+  
+  /** index into the buffers array pointing to the current buffer used as the head */
+  private int bufferUpto = -1;                        // Which buffer we are upto
   /** Where we are in head buffer */
   public int byteUpto = BYTE_BLOCK_SIZE;
 
@@ -131,43 +138,64 @@ public final class ByteBlockPool {
     this.allocator = allocator;
   }
   
-  public void dropBuffersAndReset() {
-    if (bufferUpto != -1) {
-      // Recycle all but the first buffer
-      allocator.recycleByteBlocks(buffers, 0, 1+bufferUpto);
-
-      // Re-use the first buffer
-      bufferUpto = -1;
-      byteUpto = BYTE_BLOCK_SIZE;
-      byteOffset = -BYTE_BLOCK_SIZE;
-      buffers = new byte[10][];
-      buffer = null;
-    }
-  }
-
+  /**
+   * Resets the pool to its initial state reusing the first buffer and fills all
+   * buffers with <tt>0</tt> bytes before they reused or passed to
+   * {@link Allocator#recycleByteBlocks(byte[][], int, int)}. Calling
+   * {@link ByteBlockPool#nextBuffer()} is not needed after reset.
+   */
   public void reset() {
+    reset(true, true);
+  }
+  
+  /**
+   * Expert: Resets the pool to its initial state reusing the first buffer. Calling
+   * {@link ByteBlockPool#nextBuffer()} is not needed after reset. 
+   * @param zeroFillBuffers if <code>true</code> the buffers are filled with <tt>0</tt>. 
+   *        This should be set to <code>true</code> if this pool is used with slices.
+   * @param reuseFirst if <code>true</code> the first buffer will be reused and calling
+   *        {@link ByteBlockPool#nextBuffer()} is not needed after reset iff the 
+   *        block pool was used before ie. {@link ByteBlockPool#nextBuffer()} was called before.
+   */
+  public void reset(boolean zeroFillBuffers, boolean reuseFirst) {
     if (bufferUpto != -1) {
       // We allocated at least one buffer
 
-      for(int i=0;i<bufferUpto;i++)
-        // Fully zero fill buffers that we fully used
-        Arrays.fill(buffers[i], (byte) 0);
-
-      // Partial zero fill the final buffer
-      Arrays.fill(buffers[bufferUpto], 0, byteUpto, (byte) 0);
-          
-      if (bufferUpto > 0)
-        // Recycle all but the first buffer
-        allocator.recycleByteBlocks(buffers, 1, 1+bufferUpto);
-
-      // Re-use the first buffer
-      bufferUpto = 0;
-      byteUpto = 0;
-      byteOffset = 0;
-      buffer = buffers[0];
+      if (zeroFillBuffers) {
+        for(int i=0;i<bufferUpto;i++) {
+          // Fully zero fill buffers that we fully used
+          Arrays.fill(buffers[i], (byte) 0);
+        }
+        // Partial zero fill the final buffer
+        Arrays.fill(buffers[bufferUpto], 0, byteUpto, (byte) 0);
+      }
+     
+     if (bufferUpto > 0 || !reuseFirst) {
+       final int offset = reuseFirst ? 1 : 0;  
+       // Recycle all but the first buffer
+       allocator.recycleByteBlocks(buffers, offset, 1+bufferUpto);
+       Arrays.fill(buffers, offset, 1+bufferUpto, null);
+     }
+     if (reuseFirst) {
+       // Re-use the first buffer
+       bufferUpto = 0;
+       byteUpto = 0;
+       byteOffset = 0;
+       buffer = buffers[0];
+     } else {
+       bufferUpto = -1;
+       byteUpto = BYTE_BLOCK_SIZE;
+       byteOffset = -BYTE_BLOCK_SIZE;
+       buffer = null;
+     }
     }
   }
-  
+  /**
+   * Advances the pool to its next buffer. This method should be called once
+   * after the constructor to initialize the pool. In contrast to the
+   * constructor a {@link ByteBlockPool#reset()} call will advance the pool to
+   * its first buffer immediately.
+   */
   public void nextBuffer() {
     if (1+bufferUpto == buffers.length) {
       byte[][] newBuffers = new byte[ArrayUtil.oversize(buffers.length+1,
@@ -181,7 +209,11 @@ public final class ByteBlockPool {
     byteUpto = 0;
     byteOffset += BYTE_BLOCK_SIZE;
   }
-
+  
+  /**
+   * Allocates a new slice with the given size. 
+   * @see ByteBlockPool#FIRST_LEVEL_SIZE
+   */
   public int newSlice(final int size) {
     if (byteUpto > BYTE_BLOCK_SIZE-size)
       nextBuffer();
@@ -197,15 +229,32 @@ public final class ByteBlockPool {
   // array is the length of each slice, ie first slice is 5
   // bytes, next slice is 14 bytes, etc.
   
-  public final static int[] nextLevelArray = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
-  public final static int[] levelSizeArray = {5, 14, 20, 30, 40, 40, 80, 80, 120, 200};
-  public final static int FIRST_LEVEL_SIZE = levelSizeArray[0];
+  /**
+   * An array holding the offset into the {@link ByteBlockPool#LEVEL_SIZE_ARRAY}
+   * to quickly navigate to the next slice level.
+   */
+  public final static int[] NEXT_LEVEL_ARRAY = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
+  
+  /**
+   * An array holding the level sizes for byte slices.
+   */
+  public final static int[] LEVEL_SIZE_ARRAY = {5, 14, 20, 30, 40, 40, 80, 80, 120, 200};
+  
+  /**
+   * The first level size for new slices
+   * @see ByteBlockPool#newSlice(int)
+   */
+  public final static int FIRST_LEVEL_SIZE = LEVEL_SIZE_ARRAY[0];
 
+  /**
+   * Creates a new byte slice with the given starting size and 
+   * returns the slices offset in the pool.
+   */
   public int allocSlice(final byte[] slice, final int upto) {
 
     final int level = slice[upto] & 15;
-    final int newLevel = nextLevelArray[level];
-    final int newSize = levelSizeArray[newLevel];
+    final int newLevel = NEXT_LEVEL_ARRAY[level];
+    final int newSize = LEVEL_SIZE_ARRAY[newLevel];
 
     // Maybe allocate another block
     if (byteUpto > BYTE_BLOCK_SIZE-newSize)
@@ -288,13 +337,14 @@ public final class ByteBlockPool {
   }
   
   /**
-   *
+   * Copies bytes from the pool starting at the given offset with the given  
+   * length into the given {@link BytesRef} at offset <tt>0</tt> and returns it.
+   * <p>Note: this method allows to copy across block boundaries.</p>
    */
-  public final BytesRef copyFrom(final BytesRef bytes) {
-    final int length = bytes.length;
-    final int offset = bytes.offset;
+  public final BytesRef copyFrom(final BytesRef bytes, final int offset, final int length) {
     bytes.offset = 0;
     bytes.grow(length);
+    bytes.length = length;
     int bufferIndex = offset >> BYTE_BLOCK_SHIFT;
     byte[] buffer = buffers[bufferIndex];
     int pos = offset & BYTE_BLOCK_MASK;
diff --git a/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java b/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java
index 6805fda..e306484 100644
--- a/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java
+++ b/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java
@@ -228,7 +228,7 @@ public final class BytesRefHash {
     lastCount = count;
     count = 0;
     if (resetPool) {
-      pool.dropBuffersAndReset();
+      pool.reset(false, false); // we don't need to 0-fill the buffers
     }
     bytesStart = bytesStartArray.clear();
     if (lastCount != -1 && shrink(lastCount)) {
diff --git a/lucene/core/src/java/org/apache/lucene/util/RecyclingByteBlockAllocator.java b/lucene/core/src/java/org/apache/lucene/util/RecyclingByteBlockAllocator.java
index 6fd2b79..24016dc 100644
--- a/lucene/core/src/java/org/apache/lucene/util/RecyclingByteBlockAllocator.java
+++ b/lucene/core/src/java/org/apache/lucene/util/RecyclingByteBlockAllocator.java
@@ -1,7 +1,5 @@
 package org.apache.lucene.util;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.lucene.util.ByteBlockPool.Allocator;
 
 /*
@@ -22,17 +20,19 @@ import org.apache.lucene.util.ByteBlockPool.Allocator;
  */
 
 /**
- * A threadsafe {@link ByteBlockPool.Allocator} implementation that recycles unused byte
+ * A {@link ByteBlockPool.Allocator} implementation that recycles unused byte
  * blocks in a buffer and reuses them in subsequent calls to
  * {@link #getByteBlock()}.
- * 
+ * <p>
+ * Note: This class is not thread-safe
+ * </p>
  * @lucene.internal
  */
 public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
   private byte[][] freeByteBlocks;
   private final int maxBufferedBlocks;
   private int freeBlocks = 0;
-  private final AtomicLong bytesUsed;
+  private final Counter bytesUsed;
   public static final int DEFAULT_BUFFERED_BLOCKS = 64;
 
   /**
@@ -43,10 +43,10 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
    * @param maxBufferedBlocks
    *          maximum number of buffered byte block
    * @param bytesUsed
-   *          {@link AtomicLong} reference counting internally allocated bytes
+   *          {@link Counter} reference counting internally allocated bytes
    */
   public RecyclingByteBlockAllocator(int blockSize, int maxBufferedBlocks,
-      AtomicLong bytesUsed) {
+      Counter bytesUsed) {
     super(blockSize);
     freeByteBlocks = new byte[Math.min(10, maxBufferedBlocks)][];
     this.maxBufferedBlocks = maxBufferedBlocks;
@@ -62,7 +62,7 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
    *          maximum number of buffered byte block
    */
   public RecyclingByteBlockAllocator(int blockSize, int maxBufferedBlocks) {
-    this(blockSize, maxBufferedBlocks, new AtomicLong());
+    this(blockSize, maxBufferedBlocks, Counter.newCounter(false));
   }
 
   /**
@@ -72,11 +72,11 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
    * 
    */
   public RecyclingByteBlockAllocator() {
-    this(ByteBlockPool.BYTE_BLOCK_SIZE, 64, new AtomicLong());
+    this(ByteBlockPool.BYTE_BLOCK_SIZE, 64, Counter.newCounter(false));
   }
 
   @Override
-  public synchronized byte[] getByteBlock() {
+  public byte[] getByteBlock() {
     if (freeBlocks == 0) {
       bytesUsed.addAndGet(blockSize);
       return new byte[blockSize];
@@ -87,7 +87,7 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
   }
 
   @Override
-  public synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) {
+  public void recycleByteBlocks(byte[][] blocks, int start, int end) {
     final int numBlocks = Math.min(maxBufferedBlocks - freeBlocks, end - start);
     final int size = freeBlocks + numBlocks;
     if (size >= freeByteBlocks.length) {
@@ -111,14 +111,14 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
   /**
    * @return the number of currently buffered blocks
    */
-  public synchronized int numBufferedBlocks() {
+  public int numBufferedBlocks() {
     return freeBlocks;
   }
 
   /**
    * @return the number of bytes currently allocated by this {@link Allocator}
    */
-  public synchronized long bytesUsed() {
+  public long bytesUsed() {
     return bytesUsed.get();
   }
 
@@ -136,8 +136,8 @@ public final class RecyclingByteBlockAllocator extends ByteBlockPool.Allocator {
    *          the number of byte blocks to remove
    * @return the number of actually removed buffers
    */
-  public synchronized int freeBlocks(int num) {
-    assert num >= 0;
+  public int freeBlocks(int num) {
+    assert num >= 0 : "free blocks must be >= 0 but was: "+ num;
     final int stop;
     final int count;
     if (num > freeBlocks) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIntBlockPool.java b/lucene/core/src/test/org/apache/lucene/index/TestIntBlockPool.java
new file mode 100644
index 0000000..5a42335
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIntBlockPool.java
@@ -0,0 +1,156 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.util.Counter;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * tests basic {@link IntBlockPool} functionality
+ */
+public class TestIntBlockPool extends LuceneTestCase {
+  
+  public void testSingleWriterReader() {
+    Counter bytesUsed = Counter.newCounter();
+    IntBlockPool pool = new IntBlockPool(new ByteTrackingAllocator(bytesUsed));
+    
+    for (int j = 0; j < 2; j++) {
+      IntBlockPool.SliceWriter writer = new IntBlockPool.SliceWriter(pool);
+      int start = writer.startNewSlice();
+      int num = atLeast(100);
+      for (int i = 0; i < num; i++) {
+        writer.writeInt(i);
+      }
+      
+      int upto = writer.getCurrentOffset();
+      IntBlockPool.SliceReader reader = new IntBlockPool.SliceReader(pool);
+      reader.reset(start, upto);
+      for (int i = 0; i < num; i++) {
+        assertEquals(i, reader.readInt());
+      }
+      assertTrue(reader.endOfSlice());
+      if (random().nextBoolean()) {
+        pool.reset(true, false);
+        assertEquals(0, bytesUsed.get());
+      } else {
+        pool.reset(true, true);
+        assertEquals(IntBlockPool.INT_BLOCK_SIZE
+            * RamUsageEstimator.NUM_BYTES_INT, bytesUsed.get());
+      }
+    }
+  }
+  
+  public void testMultipleWriterReader() {
+    Counter bytesUsed = Counter.newCounter();
+    IntBlockPool pool = new IntBlockPool(new ByteTrackingAllocator(bytesUsed));
+    for (int j = 0; j < 2; j++) {
+      List<StartEndAndValues> holders = new ArrayList<TestIntBlockPool.StartEndAndValues>();
+      int num = atLeast(4);
+      for (int i = 0; i < num; i++) {
+        holders.add(new StartEndAndValues(random().nextInt(1000)));
+      }
+      IntBlockPool.SliceWriter writer = new IntBlockPool.SliceWriter(pool);
+      IntBlockPool.SliceReader reader = new IntBlockPool.SliceReader(pool);
+      
+      int numValuesToWrite = atLeast(10000);
+      for (int i = 0; i < numValuesToWrite; i++) {
+        StartEndAndValues values = holders
+            .get(random().nextInt(holders.size()));
+        if (values.valueCount == 0) {
+          values.start = writer.startNewSlice();
+        } else {
+          writer.reset(values.end);
+        }
+        writer.writeInt(values.nextValue());
+        values.end = writer.getCurrentOffset();
+        if (random().nextInt(5) == 0) {
+          // pick one and reader the ints
+          assertReader(reader, holders.get(random().nextInt(holders.size())));
+        }
+      }
+      
+      while (!holders.isEmpty()) {
+        StartEndAndValues values = holders.remove(random().nextInt(
+            holders.size()));
+        assertReader(reader, values);
+      }
+      if (random().nextBoolean()) {
+        pool.reset(true, false);
+        assertEquals(0, bytesUsed.get());
+      } else {
+        pool.reset(true, true);
+        assertEquals(IntBlockPool.INT_BLOCK_SIZE
+            * RamUsageEstimator.NUM_BYTES_INT, bytesUsed.get());
+      }
+    }
+  }
+  
+  private static class ByteTrackingAllocator extends IntBlockPool.Allocator {
+    private final Counter bytesUsed;
+    
+    public ByteTrackingAllocator(Counter bytesUsed) {
+      this(IntBlockPool.INT_BLOCK_SIZE, bytesUsed);
+    }
+    
+    public ByteTrackingAllocator(int blockSize, Counter bytesUsed) {
+      super(blockSize);
+      this.bytesUsed = bytesUsed;
+    }
+    
+    public int[] getIntBlock() {
+      bytesUsed.addAndGet(blockSize * RamUsageEstimator.NUM_BYTES_INT);
+      return new int[blockSize];
+    }
+    
+    @Override
+    public void recycleIntBlocks(int[][] blocks, int start, int end) {
+      bytesUsed
+          .addAndGet(-((end - start) * blockSize * RamUsageEstimator.NUM_BYTES_INT));
+    }
+    
+  }
+  
+  private void assertReader(IntBlockPool.SliceReader reader,
+      StartEndAndValues values) {
+    reader.reset(values.start, values.end);
+    for (int i = 0; i < values.valueCount; i++) {
+      assertEquals(values.valueOffset + i, reader.readInt());
+    }
+    assertTrue(reader.endOfSlice());
+  }
+  
+  private static class StartEndAndValues {
+    int valueOffset;
+    int valueCount;
+    int start;
+    int end;
+    
+    public StartEndAndValues(int valueOffset) {
+      this.valueOffset = valueOffset;
+    }
+    
+    public int nextValue() {
+      return valueOffset + valueCount++;
+    }
+    
+  }
+  
+}
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
index 2888b29..dd8486b 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
@@ -28,41 +28,53 @@ import org.apache.lucene.store.RAMDirectory;
 public class TestByteBlockPool extends LuceneTestCase {
 
   public void testCopyRefAndWrite() throws IOException {
-    List<String> list = new ArrayList<String>();
-    int maxLength = atLeast(500);
-    ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
+    Counter bytesUsed = Counter.newCounter();
+    ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(bytesUsed));
     pool.nextBuffer();
-    final int numValues = atLeast(100);
-    BytesRef ref = new BytesRef();
-    for (int i = 0; i < numValues; i++) {
-      final String value = _TestUtil.randomRealisticUnicodeString(random(),
-          maxLength);
-      list.add(value);
-      ref.copyChars(value);
-      pool.copy(ref);
+    boolean reuseFirst = random().nextBoolean();
+    for (int j = 0; j < 2; j++) {
+        
+      List<String> list = new ArrayList<String>();
+      int maxLength = atLeast(500);
+      final int numValues = atLeast(100);
+      BytesRef ref = new BytesRef();
+      for (int i = 0; i < numValues; i++) {
+        final String value = _TestUtil.randomRealisticUnicodeString(random(),
+            maxLength);
+        list.add(value);
+        ref.copyChars(value);
+        pool.copy(ref);
+      }
+      RAMDirectory dir = new RAMDirectory();
+      IndexOutput stream = dir.createOutput("foo.txt", newIOContext(random()));
+      pool.writePool(stream);
+      stream.flush();
+      stream.close();
+      IndexInput input = dir.openInput("foo.txt", newIOContext(random()));
+      assertEquals(pool.byteOffset + pool.byteUpto, stream.length());
+      BytesRef expected = new BytesRef();
+      BytesRef actual = new BytesRef();
+      for (String string : list) {
+        expected.copyChars(string);
+        actual.grow(expected.length);
+        actual.length = expected.length;
+        input.readBytes(actual.bytes, 0, actual.length);
+        assertEquals(expected, actual);
+      }
+      try {
+        input.readByte();
+        fail("must be EOF");
+      } catch (EOFException e) {
+        // expected - read past EOF
+      }
+      pool.reset(random().nextBoolean(), reuseFirst);
+      if (reuseFirst) {
+        assertEquals(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed.get());
+      } else {
+        assertEquals(0, bytesUsed.get());
+        pool.nextBuffer(); // prepare for next iter
+      }
+      dir.close();
     }
-    RAMDirectory dir = new RAMDirectory();
-    IndexOutput stream = dir.createOutput("foo.txt", newIOContext(random()));
-    pool.writePool(stream);
-    stream.flush();
-    stream.close();
-    IndexInput input = dir.openInput("foo.txt", newIOContext(random()));
-    assertEquals(pool.byteOffset + pool.byteUpto, stream.length());
-    BytesRef expected = new BytesRef();
-    BytesRef actual = new BytesRef();
-    for (String string : list) {
-      expected.copyChars(string);
-      actual.grow(expected.length);
-      actual.length = expected.length;
-      input.readBytes(actual.bytes, 0, actual.length);
-      assertEquals(expected, actual);
-    }
-    try {
-      input.readByte();
-      fail("must be EOF");
-    } catch (EOFException e) {
-      // expected - read past EOF
-    }
-    dir.close();
   }
 }
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestRecyclingByteBlockAllocator.java b/lucene/core/src/test/org/apache/lucene/util/TestRecyclingByteBlockAllocator.java
index 504cba6..8d81f65 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestRecyclingByteBlockAllocator.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestRecyclingByteBlockAllocator.java
@@ -39,7 +39,7 @@ public class TestRecyclingByteBlockAllocator extends LuceneTestCase {
 
   private RecyclingByteBlockAllocator newAllocator() {
     return new RecyclingByteBlockAllocator(1 << (2 + random().nextInt(15)),
-        random().nextInt(97), new AtomicLong());
+        random().nextInt(97), Counter.newCounter());
   }
 
   @Test
diff --git a/lucene/highlighter/src/java/org/apache/lucene/search/highlight/WeightedSpanTermExtractor.java b/lucene/highlighter/src/java/org/apache/lucene/search/highlight/WeightedSpanTermExtractor.java
index 4412738..6282fa7 100644
--- a/lucene/highlighter/src/java/org/apache/lucene/search/highlight/WeightedSpanTermExtractor.java
+++ b/lucene/highlighter/src/java/org/apache/lucene/search/highlight/WeightedSpanTermExtractor.java
@@ -44,6 +44,8 @@ import org.apache.lucene.search.spans.SpanQuery;
 import org.apache.lucene.search.spans.SpanTermQuery;
 import org.apache.lucene.search.spans.Spans;
 import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.ByteBlockPool;
+import org.apache.lucene.util.RecyclingByteBlockAllocator;
 
 /**
  * Class used to extract {@link WeightedSpanTerm}s from a {@link Query} based on whether 
@@ -53,7 +55,7 @@ public class WeightedSpanTermExtractor {
 
   private String fieldName;
   private TokenStream tokenStream;
-  private Map<String,AtomicReaderContext> readers = new HashMap<String,AtomicReaderContext>(10); 
+  private Map<String,AtomicReaderContext> readers = new HashMap<String,AtomicReaderContext>(10);
   private String defaultField;
   private boolean expandMultiTermQuery;
   private boolean cachedTokenStream;
diff --git a/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java b/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
index de10a04..7f18c8d 100644
--- a/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
+++ b/lucene/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
@@ -36,6 +36,9 @@ import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.IntBlockPool;
+import org.apache.lucene.index.IntBlockPool.SliceReader;
+import org.apache.lucene.index.IntBlockPool.SliceWriter;
 import org.apache.lucene.index.Norm;
 import org.apache.lucene.index.DocValues;
 import org.apache.lucene.index.DocsAndPositionsEnum;
@@ -58,7 +61,10 @@ import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.RAMDirectory; // for javadocs
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefHash;
+import org.apache.lucene.util.BytesRefHash.DirectBytesStartArray;
 import org.apache.lucene.util.Constants; // for javadocs
 import org.apache.lucene.util.RamUsageEstimator;
 
@@ -191,14 +197,15 @@ public class MemoryIndex {
   /** fields sorted ascending by fieldName; lazily computed on demand */
   private transient Map.Entry<String,Info>[] sortedFields; 
   
-  /** pos: positions[3*i], startOffset: positions[3*i +1], endOffset: positions[3*i +2] */
-  private final int stride;
+  private final boolean storeOffsets;
   
-  /** Could be made configurable; */
-  private static final float docBoost = 1.0f;
-
   private static final boolean DEBUG = false;
 
+  private final ByteBlockPool byteBlockPool;
+  private final IntBlockPool intBlockPool;
+//  private final IntBlockPool.SliceReader postingsReader;
+  private final IntBlockPool.SliceWriter postingsWriter;
+  
   private HashMap<String,FieldInfo> fieldInfos = new HashMap<String,FieldInfo>();
   
   /**
@@ -233,8 +240,23 @@ public class MemoryIndex {
    *            whether or not to store the start and end character offset of
    *            each token term in the text
    */
-  protected MemoryIndex(boolean storeOffsets) {
-    this.stride = storeOffsets ? 3 : 1;
+  public MemoryIndex(boolean storeOffsets) {
+    this(storeOffsets, new ByteBlockPool.DirectAllocator(), new IntBlockPool.DirectAllocator());
+    
+  }
+  
+  /**
+   * Expert: This constructor accepts a byte and int block allocator that is used internally to allocate 
+   * int & byte blocks for term and posting storage.  
+   * @param storeOffsets <code>true</code> if offsets should be stored
+   * @param byteBlockAllocator a {@link ByteBlockPool.Allocator} that allocates blocks for internal term storage
+   * @param intBlockAllocator a {@link IntBlockPool.Allocator} that allocates blocks for internal posting list storage
+   */
+  MemoryIndex(boolean storeOffsets, ByteBlockPool.Allocator byteBlockAllocator, IntBlockPool.Allocator intBlockAllocator) {
+    this.storeOffsets = storeOffsets;
+    byteBlockPool = new ByteBlockPool(byteBlockAllocator);
+    intBlockPool = new IntBlockPool(intBlockAllocator);
+    postingsWriter = new SliceWriter(intBlockPool);
   }
   
   /**
@@ -265,7 +287,7 @@ public class MemoryIndex {
       throw new RuntimeException(ex);
     }
 
-    addField(fieldName, stream);
+    addField(fieldName, stream, 1.0f, analyzer.getPositionIncrementGap(fieldName));
   }
   
   /**
@@ -319,7 +341,7 @@ public class MemoryIndex {
   public void addField(String fieldName, TokenStream stream) {
     addField(fieldName, stream, 1.0f);
   }
-
+  
   /**
    * Iterates over the given token stream and adds the resulting terms to the index;
    * Equivalent to adding a tokenized, indexed, termVectorStored, unstored,
@@ -333,9 +355,14 @@ public class MemoryIndex {
    *            the token stream to retrieve tokens from.
    * @param boost
    *            the boost factor for hits for this field
+   *  
    * @see org.apache.lucene.document.Field#setBoost(float)
    */
+  
   public void addField(String fieldName, TokenStream stream, float boost) {
+    addField(fieldName, stream, boost, 0);
+  }
+  public void addField(String fieldName, TokenStream stream, float boost, int positionIncrementGap) {
     try {
       if (fieldName == null)
         throw new IllegalArgumentException("fieldName must not be null");
@@ -343,24 +370,36 @@ public class MemoryIndex {
           throw new IllegalArgumentException("token stream must not be null");
       if (boost <= 0.0f)
           throw new IllegalArgumentException("boost factor must be greater than 0.0");
-      if (fields.get(fieldName) != null)
-        throw new IllegalArgumentException("field must not be added more than once");
-      
-      HashMap<BytesRef,ArrayIntList> terms = new HashMap<BytesRef,ArrayIntList>();
       int numTokens = 0;
       int numOverlapTokens = 0;
       int pos = -1;
+      final BytesRefHash terms;
+      final SliceByteStartArray sliceArray;
+      Info info = null;
+      long sumTotalTermFreq = 0;
+      if ((info = fields.get(fieldName)) != null) {
+        numTokens = info.numTokens;
+        numOverlapTokens = info.numOverlapTokens;
+        pos = info.lastPosition + positionIncrementGap;
+        terms = info.terms;
+        boost *= info.boost;
+        sliceArray = info.sliceArray;
+        sumTotalTermFreq = info.sumTotalTermFreq;
+      } else {
+        sliceArray = new SliceByteStartArray(BytesRefHash.DEFAULT_CAPACITY);
+        terms = new BytesRefHash(byteBlockPool, BytesRefHash.DEFAULT_CAPACITY, sliceArray);
+      }
 
       if (!fieldInfos.containsKey(fieldName)) {
         fieldInfos.put(fieldName, 
             new FieldInfo(fieldName, true, fieldInfos.size(), false, false, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS, null, null, null));
       }
-      
       TermToBytesRefAttribute termAtt = stream.getAttribute(TermToBytesRefAttribute.class);
       PositionIncrementAttribute posIncrAttribute = stream.addAttribute(PositionIncrementAttribute.class);
       OffsetAttribute offsetAtt = stream.addAttribute(OffsetAttribute.class);
       BytesRef ref = termAtt.getBytesRef();
       stream.reset();
+      
       while (stream.incrementToken()) {
         termAtt.fillBytesRef();
         if (ref.length == 0) continue; // nothing to do
@@ -370,27 +409,33 @@ public class MemoryIndex {
         if (posIncr == 0)
           numOverlapTokens++;
         pos += posIncr;
-        
-        ArrayIntList positions = terms.get(ref);
-        if (positions == null) { // term not seen before
-          positions = new ArrayIntList(stride);
-          terms.put(BytesRef.deepCopyOf(ref), positions);
+        int ord = terms.add(ref);
+        if (ord < 0) {
+          ord = (-ord) - 1;
+          postingsWriter.reset(sliceArray.end[ord]);
+        } else {
+          sliceArray.start[ord] = postingsWriter.startNewSlice();
         }
-        if (stride == 1) {
-          positions.add(pos);
+        sliceArray.freq[ord]++;
+        sumTotalTermFreq++;
+        if (!storeOffsets) {
+          postingsWriter.writeInt(pos);
         } else {
-          positions.add(pos, offsetAtt.startOffset(), offsetAtt.endOffset());
+          postingsWriter.writeInt(pos);
+          postingsWriter.writeInt(offsetAtt.startOffset());
+          postingsWriter.writeInt(offsetAtt.endOffset());
         }
+        sliceArray.end[ord] = postingsWriter.getCurrentOffset();
       }
       stream.end();
 
       // ensure infos.numTokens > 0 invariant; needed for correct operation of terms()
       if (numTokens > 0) {
-        boost = boost * docBoost; // see DocumentWriter.addDocument(...)
-        fields.put(fieldName, new Info(terms, numTokens, numOverlapTokens, boost));
+        fields.put(fieldName, new Info(terms, sliceArray, numTokens, numOverlapTokens, boost, pos, sumTotalTermFreq));
         sortedFields = null;    // invalidate sorted view, if any
       }
-    } catch (IOException e) { // can never happen
+    } catch (Exception e) { // can never happen
+      e.printStackTrace();
       throw new RuntimeException(e);
     } finally {
       try {
@@ -484,10 +529,6 @@ public class MemoryIndex {
     return RamUsageEstimator.sizeOf(this);
   }
 
-  private int numPositions(ArrayIntList positions) {
-    return positions.size() / stride;
-  }
-  
   /** sorts into ascending order (on demand), reusing memory along the way */
   private void sortFields() {
     if (sortedFields == null) sortedFields = sort(fields);
@@ -519,31 +560,50 @@ public class MemoryIndex {
     sortFields();   
     int sumPositions = 0;
     int sumTerms = 0;
-    
+    final BytesRef spare = new BytesRef();
     for (int i=0; i < sortedFields.length; i++) {
       Map.Entry<String,Info> entry = sortedFields[i];
       String fieldName = entry.getKey();
       Info info = entry.getValue();
       info.sortTerms();
       result.append(fieldName + ":\n");
-      
+      SliceByteStartArray sliceArray = info.sliceArray;
       int numPositions = 0;
-      for (int j=0; j < info.sortedTerms.length; j++) {
-        Map.Entry<BytesRef,ArrayIntList> e = info.sortedTerms[j];
-        BytesRef term = e.getKey();
-        ArrayIntList positions = e.getValue();
-        result.append("\t'" + term + "':" + numPositions(positions) + ":");
-        result.append(positions.toString(stride)); // ignore offsets
+      SliceReader postingsReader = new SliceReader(intBlockPool);
+      for (int j=0; j < info.terms.size(); j++) {
+        int ord = info.sortedTerms[j];
+        info.terms.get(ord, spare);
+        int freq = sliceArray.freq[ord];
+        result.append("\t'" + spare + "':" + freq + ":");
+        postingsReader.reset(sliceArray.start[ord], sliceArray.end[ord]);
+        result.append(" [");
+        final int iters = storeOffsets ? 3 : 1; 
+        while(!postingsReader.endOfSlice()) {
+          result.append("(");
+          
+          for (int k = 0; k < iters; k++) {
+            result.append(postingsReader.readInt());
+            if (k < iters-1) {
+              result.append(", ");
+            }
+          }
+          result.append(")");
+          if (!postingsReader.endOfSlice()) {
+            result.append(",");
+          }
+          
+        }
+        result.append("]");
         result.append("\n");
-        numPositions += numPositions(positions);
+        numPositions += freq;
       }
       
-      result.append("\tterms=" + info.sortedTerms.length);
+      result.append("\tterms=" + info.terms.size());
       result.append(", positions=" + numPositions);
       result.append(", memory=" + RamUsageEstimator.humanReadableUnits(RamUsageEstimator.sizeOf(info)));
       result.append("\n");
       sumPositions += numPositions;
-      sumTerms += info.sortedTerms.length;
+      sumTerms += info.terms.size();
     }
     
     result.append("\nfields=" + sortedFields.length);
@@ -563,10 +623,12 @@ public class MemoryIndex {
      * Term strings and their positions for this field: Map <String
      * termText, ArrayIntList positions>
      */
-    private final HashMap<BytesRef,ArrayIntList> terms; 
+    private final BytesRefHash terms; 
+    
+    private final SliceByteStartArray sliceArray;
     
     /** Terms sorted ascending by term text; computed on demand */
-    private transient Map.Entry<BytesRef,ArrayIntList>[] sortedTerms;
+    private transient int[] sortedTerms;
     
     /** Number of added tokens for this field */
     private final int numTokens;
@@ -579,16 +641,17 @@ public class MemoryIndex {
 
     private final long sumTotalTermFreq;
 
-    public Info(HashMap<BytesRef,ArrayIntList> terms, int numTokens, int numOverlapTokens, float boost) {
+    /** the last position encountered in this field for multi field support*/
+    private int lastPosition;
+
+    public Info(BytesRefHash terms, SliceByteStartArray sliceArray, int numTokens, int numOverlapTokens, float boost, int lastPosition, long sumTotalTermFreq) {
       this.terms = terms;
+      this.sliceArray = sliceArray; 
       this.numTokens = numTokens;
       this.numOverlapTokens = numOverlapTokens;
       this.boost = boost;
-      long sum = 0;
-      for(Map.Entry<BytesRef,ArrayIntList> ent : terms.entrySet()) {
-        sum += ent.getValue().size();
-      }
-      sumTotalTermFreq = sum;
+      this.sumTotalTermFreq = sumTotalTermFreq;
+      this.lastPosition = lastPosition;
     }
 
     public long getSumTotalTermFreq() {
@@ -604,83 +667,15 @@ public class MemoryIndex {
      * apart from more sophisticated Tries / prefix trees).
      */
     public void sortTerms() {
-      if (sortedTerms == null) sortedTerms = sort(terms);
+      if (sortedTerms == null) 
+        sortedTerms = terms.sort(BytesRef.getUTF8SortedAsUnicodeComparator());
     }
         
     public float getBoost() {
       return boost;
     }
-    
   }
   
-  
-  ///////////////////////////////////////////////////////////////////////////////
-  // Nested classes:
-  ///////////////////////////////////////////////////////////////////////////////
-  /**
-   * Efficient resizable auto-expanding list holding <code>int</code> elements;
-   * implemented with arrays.
-   */
-  private static final class ArrayIntList {
-
-    private int[] elements;
-    private int size = 0;
-      
-    public ArrayIntList(int initialCapacity) {
-      elements = new int[initialCapacity];
-    }
-
-    public void add(int elem) {
-      if (size == elements.length) ensureCapacity(size + 1);
-      elements[size++] = elem;
-    }
-
-    public void add(int pos, int start, int end) {
-      if (size + 3 > elements.length) ensureCapacity(size + 3);
-      elements[size] = pos;
-      elements[size+1] = start;
-      elements[size+2] = end;
-      size += 3;
-    }
-
-    public int get(int index) {
-      if (index >= size) throwIndex(index);
-      return elements[index];
-    }
-    
-    public int size() {
-      return size;
-    }
-    
-    private void ensureCapacity(int minCapacity) {
-      int newCapacity = Math.max(minCapacity, (elements.length * 3) / 2 + 1);
-      int[] newElements = new int[newCapacity];
-      System.arraycopy(elements, 0, newElements, 0, size);
-      elements = newElements;
-    }
-
-    private void throwIndex(int index) {
-      throw new IndexOutOfBoundsException("index: " + index
-            + ", size: " + size);
-    }
-    
-    /** returns the first few positions (without offsets); debug only */
-    public String toString(int stride) {
-      int s = size() / stride;
-      int len = Math.min(10, s); // avoid printing huge lists
-      StringBuilder buf = new StringBuilder(4*len);
-      buf.append("[");
-      for (int i = 0; i < len; i++) {
-        buf.append(get(i*stride));
-        if (i < len-1) buf.append(", ");
-      }
-      if (len != s) buf.append(", ..."); // and some more...
-      buf.append("]");
-      return buf.toString();
-    }   
-  }
-  
-  
   ///////////////////////////////////////////////////////////////////////////////
   // Nested classes:
   ///////////////////////////////////////////////////////////////////////////////
@@ -764,7 +759,7 @@ public class MemoryIndex {
 
             @Override
             public long size() {
-              return info.sortedTerms.length;
+              return info.terms.size();
             }
 
             @Override
@@ -775,17 +770,17 @@ public class MemoryIndex {
             @Override
             public long getSumDocFreq() {
               // each term has df=1
-              return info.sortedTerms.length;
+              return info.terms.size();
             }
 
             @Override
             public int getDocCount() {
-              return info.sortedTerms.length > 0 ? 1 : 0;
+              return info.terms.size() > 0 ? 1 : 0;
             }
 
             @Override
             public boolean hasOffsets() {
-              return stride == 3;
+              return storeOffsets;
             }
 
             @Override
@@ -822,48 +817,62 @@ public class MemoryIndex {
         this.info = info;
         info.sortTerms();
       }
+      
+      private final int binarySearch(BytesRef b, BytesRef bytesRef, int low,
+          int high, BytesRefHash hash, int[] ords, Comparator<BytesRef> comparator) {
+        int mid = 0;
+        while (low <= high) {
+          mid = (low + high) >>> 1;
+          hash.get(ords[mid], bytesRef);
+          final int cmp = comparator.compare(bytesRef, b);
+          if (cmp < 0) {
+            low = mid + 1;
+          } else if (cmp > 0) {
+            high = mid - 1;
+          } else {
+            return mid;
+          }
+        }
+        assert comparator.compare(bytesRef, b) != 0;
+        return -(low + 1);
+      }
+    
 
       @Override
       public boolean seekExact(BytesRef text, boolean useCache) {
-        termUpto = Arrays.binarySearch(info.sortedTerms, text, termComparator);
-        if (termUpto >= 0) {
-          br.copyBytes(info.sortedTerms[termUpto].getKey());
-          return true;
-        } else {
-          return false;
-        }
+        termUpto = binarySearch(text, br, 0, info.terms.size()-1, info.terms, info.sortedTerms, BytesRef.getUTF8SortedAsUnicodeComparator());
+        return termUpto >= 0;
       }
 
       @Override
       public SeekStatus seekCeil(BytesRef text, boolean useCache) {
-        termUpto = Arrays.binarySearch(info.sortedTerms, text, termComparator);
+        termUpto = binarySearch(text, br, 0, info.terms.size()-1, info.terms, info.sortedTerms, BytesRef.getUTF8SortedAsUnicodeComparator());
         if (termUpto < 0) { // not found; choose successor
-          termUpto = -termUpto -1;
-          if (termUpto >= info.sortedTerms.length) {
+          termUpto = -termUpto-1;
+          if (termUpto >= info.terms.size()) {
             return SeekStatus.END;
           } else {
-            br.copyBytes(info.sortedTerms[termUpto].getKey());
+            info.terms.get(info.sortedTerms[termUpto], br);
             return SeekStatus.NOT_FOUND;
           }
         } else {
-          br.copyBytes(info.sortedTerms[termUpto].getKey());
           return SeekStatus.FOUND;
         }
       }
 
       @Override
       public void seekExact(long ord) {
-        assert ord < info.sortedTerms.length;
+        assert ord < info.terms.size();
         termUpto = (int) ord;
       }
       
       @Override
       public BytesRef next() {
         termUpto++;
-        if (termUpto >= info.sortedTerms.length) {
+        if (termUpto >= info.terms.size()) {
           return null;
         } else {
-          br.copyBytes(info.sortedTerms[termUpto].getKey());
+          info.terms.get(info.sortedTerms[termUpto], br);
           return br;
         }
       }
@@ -885,7 +894,7 @@ public class MemoryIndex {
 
       @Override
       public long totalTermFreq() {
-        return info.sortedTerms[termUpto].getValue().size();
+        return info.sliceArray.freq[info.sortedTerms[termUpto]];
       }
 
       @Override
@@ -893,7 +902,7 @@ public class MemoryIndex {
         if (reuse == null || !(reuse instanceof MemoryDocsEnum)) {
           reuse = new MemoryDocsEnum();
         }
-        return ((MemoryDocsEnum) reuse).reset(liveDocs, info.sortedTerms[termUpto].getValue());
+        return ((MemoryDocsEnum) reuse).reset(liveDocs, info.sliceArray.freq[info.sortedTerms[termUpto]]);
       }
 
       @Override
@@ -901,7 +910,8 @@ public class MemoryIndex {
         if (reuse == null || !(reuse instanceof MemoryDocsAndPositionsEnum)) {
           reuse = new MemoryDocsAndPositionsEnum();
         }
-        return ((MemoryDocsAndPositionsEnum) reuse).reset(liveDocs, info.sortedTerms[termUpto].getValue());
+        final int ord = info.sortedTerms[termUpto];
+        return ((MemoryDocsAndPositionsEnum) reuse).reset(liveDocs, info.sliceArray.start[ord], info.sliceArray.end[ord], info.sliceArray.freq[ord]);
       }
 
       @Override
@@ -924,16 +934,16 @@ public class MemoryIndex {
     }
     
     private class MemoryDocsEnum extends DocsEnum {
-      private ArrayIntList positions;
       private boolean hasNext;
       private Bits liveDocs;
       private int doc = -1;
+      private int freq;
 
-      public DocsEnum reset(Bits liveDocs, ArrayIntList positions) {
+      public DocsEnum reset(Bits liveDocs, int freq) {
         this.liveDocs = liveDocs;
-        this.positions = positions;
         hasNext = true;
         doc = -1;
+        this.freq = freq;
         return this;
       }
 
@@ -959,26 +969,35 @@ public class MemoryIndex {
 
       @Override
       public int freq() throws IOException {
-        return positions.size();
+        return freq;
       }
     }
     
     private class MemoryDocsAndPositionsEnum extends DocsAndPositionsEnum {
-      private ArrayIntList positions;
-      private int posUpto;
+      private int posUpto; // for assert
       private boolean hasNext;
       private Bits liveDocs;
       private int doc = -1;
+      private SliceReader sliceReader;
+      private int freq;
+      private int startOffset;
+      private int endOffset;
+      
+      public MemoryDocsAndPositionsEnum() {
+        this.sliceReader = new SliceReader(intBlockPool);
+      }
 
-      public DocsAndPositionsEnum reset(Bits liveDocs, ArrayIntList positions) {
+      public DocsAndPositionsEnum reset(Bits liveDocs, int start, int end, int freq) {
         this.liveDocs = liveDocs;
-        this.positions = positions;
-        posUpto = 0;
+        this.sliceReader.reset(start, end);
+        posUpto = 0; // for assert
         hasNext = true;
         doc = -1;
+        this.freq = freq;
         return this;
       }
 
+
       @Override
       public int docID() {
         return doc;
@@ -1001,22 +1020,31 @@ public class MemoryIndex {
 
       @Override
       public int freq() throws IOException {
-        return positions.size() / stride;
+        return freq;
       }
 
       @Override
       public int nextPosition() {
-        return positions.get(posUpto++ * stride);
+        assert posUpto++ < freq;
+        assert !sliceReader.endOfSlice() : " stores offsets : " + startOffset;
+        if (storeOffsets) {
+          int pos = sliceReader.readInt();
+          startOffset = sliceReader.readInt();
+          endOffset = sliceReader.readInt();
+          return pos;
+        } else {
+          return sliceReader.readInt();
+        }
       }
 
       @Override
       public int startOffset() {
-        return stride == 1 ? -1 : positions.get((posUpto - 1) * stride + 1);
+        return startOffset;
       }
 
       @Override
       public int endOffset() {
-        return stride == 1 ? -1 : positions.get((posUpto - 1) * stride + 2);
+        return endOffset;
       }
 
       @Override
@@ -1105,4 +1133,58 @@ public class MemoryIndex {
       return norms;
     }
   }
+  
+  /**
+   * Resets the {@link MemoryIndex} to its initial state and recycles all internal buffers.
+   */
+  void reset() {
+    this.fieldInfos.clear();
+    this.fields.clear();
+    this.sortedFields = null;
+    byteBlockPool.reset(false, true); // no need to 0-fill the buffers
+    intBlockPool.reset(true, true); // here must must 0-fill since we use slices
+  }
+  
+  private static final class SliceByteStartArray extends DirectBytesStartArray {
+    int[] start; // the start offset in the IntBlockPool per term
+    int[] end; // the end pointer in the IntBlockPool for the postings slice per term
+    int[] freq; // the term frequency
+    
+    public SliceByteStartArray(int initSize) {
+      super(initSize);
+    }
+    
+    @Override
+    public int[] init() {
+      final int[] ord = super.init();
+      start = new int[ArrayUtil.oversize(ord.length, RamUsageEstimator.NUM_BYTES_INT)];
+      end = new int[ArrayUtil.oversize(ord.length, RamUsageEstimator.NUM_BYTES_INT)];
+      freq = new int[ArrayUtil.oversize(ord.length, RamUsageEstimator.NUM_BYTES_INT)];
+      assert start.length >= ord.length;
+      assert end.length >= ord.length;
+      assert freq.length >= ord.length;
+      return ord;
+    }
+
+    @Override
+    public int[] grow() {
+      final int[] ord = super.grow();
+      if (start.length < ord.length) {
+        start = ArrayUtil.grow(start, ord.length);
+        end = ArrayUtil.grow(end, ord.length);
+        freq = ArrayUtil.grow(freq, ord.length);
+      }      
+      assert start.length >= ord.length;
+      assert end.length >= ord.length;
+      assert freq.length >= ord.length;
+      return ord;
+    }
+
+    @Override
+    public int[] clear() {
+     start = end = null;
+     return super.clear();
+    }
+    
+  }
 }
diff --git a/lucene/memory/src/test/org/apache/lucene/index/memory/MemoryIndexTest.java b/lucene/memory/src/test/org/apache/lucene/index/memory/MemoryIndexTest.java
index 77dcedf..9f18d17 100644
--- a/lucene/memory/src/test/org/apache/lucene/index/memory/MemoryIndexTest.java
+++ b/lucene/memory/src/test/org/apache/lucene/index/memory/MemoryIndexTest.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -37,14 +40,21 @@ import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.DocsAndPositionsEnum;
 import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.Fields;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.IntBlockPool;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.MultiTerms;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.PhraseQuery;
 import org.apache.lucene.search.RegexpQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.spans.SpanMultiTermQueryWrapper;
@@ -52,8 +62,13 @@ import org.apache.lucene.search.spans.SpanOrQuery;
 import org.apache.lucene.search.spans.SpanQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.ByteBlockPool.Allocator;
+import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LineFileDocs;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.RecyclingByteBlockAllocator;
 import org.apache.lucene.util._TestUtil;
 
 /**
@@ -94,15 +109,18 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
    * runs random tests, up to ITERATIONS times.
    */
   public void testRandomQueries() throws Exception {
-    for (int i = 0; i < ITERATIONS; i++)
-      assertAgainstRAMDirectory();
+    MemoryIndex index =  new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
+    for (int i = 0; i < ITERATIONS; i++) {
+      assertAgainstRAMDirectory(index);
+    }
   }
-
+  
   /**
    * Build a randomish document for both RAMDirectory and MemoryIndex,
    * and run all the queries against it.
    */
-  public void assertAgainstRAMDirectory() throws Exception {
+  public void assertAgainstRAMDirectory(MemoryIndex memory) throws Exception {
+    memory.reset();
     StringBuilder fooField = new StringBuilder();
     StringBuilder termField = new StringBuilder();
  
@@ -132,7 +150,6 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     writer.addDocument(doc);
     writer.close();
     
-    MemoryIndex memory = new MemoryIndex();
     memory.addField("foo", fooField.toString(), analyzer);
     memory.addField("term", termField.toString(), analyzer);
     
@@ -144,10 +161,66 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     } else {
       assertTrue(memory.getMemorySize() > 0L);
     }
-
+    AtomicReader reader = (AtomicReader) memory.createSearcher().getIndexReader();
+    IndexReader competitor = DirectoryReader.open(ramdir);
+    duellReaders(competitor, reader);
+    IOUtils.close(reader, competitor);
     assertAllQueries(memory, ramdir, analyzer);  
     ramdir.close();    
   }
+
+  private void duellReaders(IndexReader competitor, AtomicReader memIndexReader)
+      throws IOException {
+    Fields memFields = memIndexReader.fields();
+    for (String field : MultiFields.getFields(competitor)) {
+      Terms memTerms = memFields.terms(field);
+      Terms iwTerms = MultiFields.getTerms(memIndexReader, field);
+      if (iwTerms == null) {
+        assertNull(memTerms);
+      } else {
+        assertNotNull(memTerms);
+        assertEquals(iwTerms.getDocCount(), memTerms.getDocCount());
+        assertEquals(iwTerms.getSumDocFreq(), memTerms.getSumDocFreq());
+        assertEquals(iwTerms.getSumTotalTermFreq(), memTerms.getSumTotalTermFreq());
+        TermsEnum iwTermsIter = iwTerms.iterator(null);
+        TermsEnum memTermsIter = memTerms.iterator(null);
+        if (iwTerms.hasPositions()) {
+          final boolean offsets = iwTerms.hasOffsets() && memTerms.hasOffsets();
+         
+          while(iwTermsIter.next() != null) {
+            assertNotNull(memTermsIter.next());
+            assertEquals(iwTermsIter.term(), memTermsIter.term());
+            DocsAndPositionsEnum iwDocsAndPos = iwTermsIter.docsAndPositions(null, null);
+            DocsAndPositionsEnum memDocsAndPos = memTermsIter.docsAndPositions(null, null);
+            while(iwDocsAndPos.nextDoc() != DocsAndPositionsEnum.NO_MORE_DOCS) {
+              assertEquals(iwDocsAndPos.docID(), memDocsAndPos.nextDoc());
+              assertEquals(iwDocsAndPos.freq(), memDocsAndPos.freq());
+              for (int i = 0; i < iwDocsAndPos.freq(); i++) {
+                assertEquals("term: " + iwTermsIter.term().utf8ToString(), iwDocsAndPos.nextPosition(), memDocsAndPos.nextPosition());
+                if (offsets) {
+                  assertEquals(iwDocsAndPos.startOffset(), memDocsAndPos.startOffset());
+                  assertEquals(iwDocsAndPos.endOffset(), memDocsAndPos.endOffset());
+                }
+              }
+              
+            }
+            
+          }
+        } else {
+          while(iwTermsIter.next() != null) {
+            assertEquals(iwTermsIter.term(), memTermsIter.term());
+            DocsEnum iwDocsAndPos = iwTermsIter.docs(null, null);
+            DocsEnum memDocsAndPos = memTermsIter.docs(null, null);
+            while(iwDocsAndPos.nextDoc() != DocsAndPositionsEnum.NO_MORE_DOCS) {
+              assertEquals(iwDocsAndPos.docID(), memDocsAndPos.nextDoc());
+              assertEquals(iwDocsAndPos.freq(), memDocsAndPos.freq());
+            }
+          }
+        }
+      }
+      
+    }
+  }
   
   /**
    * Run all queries against both the RAMDirectory and MemoryIndex, ensuring they are the same.
@@ -160,7 +233,7 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     for (String query : queries) {
       TopDocs ramDocs = ram.search(qp.parse(query), 1);
       TopDocs memDocs = mem.search(qp.parse(query), 1);
-      assertEquals(ramDocs.totalHits, memDocs.totalHits);
+      assertEquals(query, ramDocs.totalHits, memDocs.totalHits);
     }
     reader.close();
   }
@@ -202,7 +275,7 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
   
   public void testDocsEnumStart() throws Exception {
     Analyzer analyzer = new MockAnalyzer(random());
-    MemoryIndex memory = new MemoryIndex();
+    MemoryIndex memory = new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
     memory.addField("foo", "bar", analyzer);
     AtomicReader reader = (AtomicReader) memory.createSearcher().getIndexReader();
     DocsEnum disi = _TestUtil.docs(random(), reader, "foo", new BytesRef("bar"), null, null, 0);
@@ -220,27 +293,40 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     reader.close();
   }
   
+  private Allocator randomByteBlockAllocator() {
+    if (random().nextBoolean()) {
+      return new RecyclingByteBlockAllocator();
+    } else {
+      return new ByteBlockPool.DirectAllocator();
+    }
+  }
+  
   public void testDocsAndPositionsEnumStart() throws Exception {
     Analyzer analyzer = new MockAnalyzer(random());
-    MemoryIndex memory = new MemoryIndex(true);
-    memory.addField("foo", "bar", analyzer);
-    AtomicReader reader = (AtomicReader) memory.createSearcher().getIndexReader();
-    DocsAndPositionsEnum disi = reader.termPositionsEnum(new Term("foo", "bar"));
-    int docid = disi.docID();
-    assertTrue(docid == -1 || docid == DocIdSetIterator.NO_MORE_DOCS);
-    assertTrue(disi.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
-    assertEquals(0, disi.nextPosition());
-    assertEquals(0, disi.startOffset());
-    assertEquals(3, disi.endOffset());
-    
-    // now reuse and check again
-    TermsEnum te = reader.terms("foo").iterator(null);
-    assertTrue(te.seekExact(new BytesRef("bar"), true));
-    disi = te.docsAndPositions(null, disi);
-    docid = disi.docID();
-    assertTrue(docid == -1 || docid == DocIdSetIterator.NO_MORE_DOCS);
-    assertTrue(disi.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
-    reader.close();
+    int numIters = atLeast(3);
+    MemoryIndex memory = new MemoryIndex(true, randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
+    for (int i = 0; i < numIters; i++) { // check reuse
+      memory.addField("foo", "bar", analyzer);
+      AtomicReader reader = (AtomicReader) memory.createSearcher().getIndexReader();
+      assertEquals(1, reader.terms("foo").getSumTotalTermFreq());
+      DocsAndPositionsEnum disi = reader.termPositionsEnum(new Term("foo", "bar"));
+      int docid = disi.docID();
+      assertTrue(docid == -1 || docid == DocIdSetIterator.NO_MORE_DOCS);
+      assertTrue(disi.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
+      assertEquals(0, disi.nextPosition());
+      assertEquals(0, disi.startOffset());
+      assertEquals(3, disi.endOffset());
+      
+      // now reuse and check again
+      TermsEnum te = reader.terms("foo").iterator(null);
+      assertTrue(te.seekExact(new BytesRef("bar"), true));
+      disi = te.docsAndPositions(null, disi);
+      docid = disi.docID();
+      assertTrue(docid == -1 || docid == DocIdSetIterator.NO_MORE_DOCS);
+      assertTrue(disi.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
+      reader.close();
+      memory.reset();
+    }
   }
 
   // LUCENE-3831
@@ -248,7 +334,7 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     RegexpQuery regex = new RegexpQuery(new Term("field", "worl."));
     SpanQuery wrappedquery = new SpanMultiTermQueryWrapper<RegexpQuery>(regex);
         
-    MemoryIndex mindex = new MemoryIndex();
+    MemoryIndex mindex = new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
     mindex.addField("field", new MockAnalyzer(random()).tokenStream("field", new StringReader("hello there")));
 
     // This throws an NPE
@@ -260,10 +346,65 @@ public class MemoryIndexTest extends BaseTokenStreamTestCase {
     RegexpQuery regex = new RegexpQuery(new Term("field", "worl."));
     SpanQuery wrappedquery = new SpanOrQuery(new SpanMultiTermQueryWrapper<RegexpQuery>(regex));
 
-    MemoryIndex mindex = new MemoryIndex();
+    MemoryIndex mindex = new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
     mindex.addField("field", new MockAnalyzer(random()).tokenStream("field", new StringReader("hello there")));
 
     // This passes though
     assertEquals(0, mindex.search(wrappedquery), 0.00001f);
   }
+  
+  public void testSameFieldAddedMultipleTimes() throws IOException {
+    MemoryIndex mindex = new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
+    MockAnalyzer mockAnalyzer = new MockAnalyzer(random());
+    mindex.addField("field", "the quick brown fox", mockAnalyzer);
+    mindex.addField("field", "jumps over the", mockAnalyzer);
+    AtomicReader reader = (AtomicReader) mindex.createSearcher().getIndexReader();
+    assertEquals(7, reader.terms("field").getSumTotalTermFreq());
+    PhraseQuery query = new PhraseQuery();
+    query.add(new Term("field", "fox"));
+    query.add(new Term("field", "jumps"));
+    assertTrue(mindex.search(query) > 0.1);
+    mindex.reset();
+    mockAnalyzer.setPositionIncrementGap(1 + random().nextInt(10));
+    mindex.addField("field", "the quick brown fox", mockAnalyzer);
+    mindex.addField("field", "jumps over the", mockAnalyzer);
+    assertEquals(0, mindex.search(query), 0.00001f);
+    query.setSlop(10);
+    assertTrue("posGap" + mockAnalyzer.getPositionIncrementGap("field") , mindex.search(query) > 0.0001);
+  }
+  
+  
+  public void testDuellMemIndex() throws IOException {
+    LineFileDocs lineFileDocs = new LineFileDocs(random());
+    int numDocs = atLeast(10);
+    MemoryIndex memory = new MemoryIndex(random().nextBoolean(), randomByteBlockAllocator(), new IntBlockPool.DirectAllocator());
+    for (int i = 0; i < numDocs; i++) {
+      Directory dir = newDirectory();
+      MockAnalyzer mockAnalyzer = new MockAnalyzer(random());
+      IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(random(), TEST_VERSION_CURRENT, mockAnalyzer));
+      Document nextDoc = lineFileDocs.nextDoc();
+      Document doc = new Document();
+      for (Field field : nextDoc.getFields()) {
+        if (field.fieldType().indexed()) {
+          doc.add(field);
+          if (random().nextInt(3) == 0) {
+            doc.add(field);  // randomly add the same field twice
+          }
+        }
+      }
+      
+      writer.addDocument(doc);
+      writer.close();
+      for (IndexableField field : doc.indexableFields()) {
+          memory.addField(field.name(), ((Field)field).stringValue(), mockAnalyzer);  
+      }
+      IndexReader competitor = DirectoryReader.open(dir);
+      AtomicReader memIndexReader= (AtomicReader) memory.createSearcher().getIndexReader();
+      duellReaders(competitor, memIndexReader);
+      IOUtils.close(competitor, memIndexReader);
+      memory.reset();
+      dir.close();
+    }
+    lineFileDocs.close();
+  }
 }
diff --git a/lucene/suggest/src/java/org/apache/lucene/search/suggest/BytesRefList.java b/lucene/suggest/src/java/org/apache/lucene/search/suggest/BytesRefList.java
index ff8c400..c0d7d1a 100644
--- a/lucene/suggest/src/java/org/apache/lucene/search/suggest/BytesRefList.java
+++ b/lucene/suggest/src/java/org/apache/lucene/search/suggest/BytesRefList.java
@@ -64,7 +64,7 @@ public final class BytesRefList {
     lastElement = 0;
     currentOffset = 0;
     Arrays.fill(offsets, 0);
-    pool.reset();
+    pool.reset(false, true); // no need to 0 fill the buffers we control the allocator
   }
   
   /**
@@ -101,10 +101,10 @@ public final class BytesRefList {
    */
   public BytesRef get(BytesRef spare, int ord) {
     if (lastElement > ord) {
-      spare.offset = offsets[ord];
-      spare.length = ord == lastElement - 1 ? currentOffset - spare.offset
-          : offsets[ord + 1] - spare.offset;
-      pool.copyFrom(spare);
+      int offset = offsets[ord];
+      int length = ord == lastElement - 1 ? currentOffset - offset
+          : offsets[ord + 1] - offset;
+      pool.copyFrom(spare, offset, length);
       return spare;
     }
     throw new IndexOutOfBoundsException("index " + ord
