Index: lucene/src/java/org/apache/lucene/util/BytesRefHash.java
===================================================================
--- lucene/src/java/org/apache/lucene/util/BytesRefHash.java	(revision 1164400)
+++ lucene/src/java/org/apache/lucene/util/BytesRefHash.java	(working copy)
@@ -457,7 +457,7 @@
     hashSize = newSize;
     hashHalfSize = newSize / 2;
   }
-
+  
   /**
    * reinitializes the {@link BytesRefHash} after a previous {@link #clear()}
    * call. If {@link #clear()} has not been called previously this method has no
@@ -610,4 +610,208 @@
       return bytesUsed;
     }
   }
+  public BytesRefHashView getView(Comparator<BytesRef> comp, BytesRefHashView previous, boolean sort) {
+    if (previous != null && previous.size() == size()) {
+      return previous;
+    }
+    return new BytesRefHashView(this, comp, previous, sort);
+  }
+  
+  public BytesRefHashView getView(Comparator<BytesRef> comp, boolean sort) {
+    return getView(comp, null, sort);
+  }
+  
+  // nocommit javadoc
+  // nocommit - add ram tracking?
+  public static final class BytesRefHashView {
+    private final ByteBlockPool pool;
+    private final int[] bytesStart;
+    private final int hashSize;
+    private final int hashMask;
+    private final int count;
+    private final int[] ords;
+    private final int[] sortedOrds;
+    private final Comparator<BytesRef> comp;
+    
+    public BytesRefHashView(BytesRefHash hash, Comparator<BytesRef> comp, BytesRefHashView previous, boolean sort) {
+      pool = hash.pool;
+      ords =  hash.ords;
+      count = hash.count;
+      bytesStart = hash.bytesStart;
+      hashSize = ords.length;
+      hashMask = hashSize -1;
+      this.comp = comp;
+      if (sort) {
+        sortedOrds = previous != null ? merge(previous) : sort(initSortedOrds(), 0, count-1);
+      } else {
+        sortedOrds = null;
+      }
+    }
+    
+    public boolean seekExact(BytesRef bytes, BytesRef spare) {
+
+      int code = bytes.hashCode();
+      assert bytesStart != null : "Bytesstart is null - not initialized";
+      // final position
+      int hashPos = code & hashMask;
+      int e = ords[hashPos];
+      /*
+       * we might see stale values in the ords array since we concurrently
+       * write. yet, if so we can check against the count and tread it as -1 if
+       * it is less than the count
+       */
+      if (e != -1 && e < count && !equals(e, bytes, spare)) {
+        // Conflict: keep searching different locations in
+        // the hash table.
+        final int inc = ((code >> 8) + code) | 1;
+        do {
+          code += inc;
+          hashPos = code & hashMask;
+          e = ords[hashPos];
+        } while (e != -1 && e < count && !equals(e, bytes, spare));
+      }
+      return e >= 0; 
+    }
+    
+    public int size() {
+      return count;
+    }
+    
+    public int next(int previous, BytesRef spare) {
+      if (sortedOrds == null) {
+        throw new UnsupportedOperationException("BytesRefHashView was opened unsorted");
+      }
+      final int[] sorted = sortedOrds;
+      final int next = previous +1;
+      if (next < count) {
+        pool.setBytesRef(spare, bytesStart[sorted[next]]); 
+        return next;
+      }
+      // nocommit is -1 good here?
+      return -1;
+    }
+    
+    public int seekCeil(BytesRef bytes, BytesRef spare) {
+      if (sortedOrds == null) {
+        throw new UnsupportedOperationException("BytesRefHashView was opened unsorted");
+      }
+      // do a binary search on the sorted array 
+      final int[] sorted = sortedOrds;
+      int low = 0;
+      int mid = 0;
+      int high = count-1;
+      while (low <= high) {
+        mid = (low + high) >>> 1;
+        pool.setBytesRef(spare, bytesStart[sorted[mid]]);
+        final int cmp = comp.compare(spare, bytes);
+        if (cmp < 0) {
+          low = mid + 1;
+        } else if (cmp > 0) {
+          high = mid - 1;
+        } else {
+          return mid;
+        }
+      }
+      // once we are here there is no way the values is in the hash table
+      assert comp.compare(spare, bytes) != 0;
+      return -(low + 1);
+    }
+    
+    private boolean equals(int ord, BytesRef b, BytesRef spare) {
+      return pool.setBytesRef(spare, bytesStart[ord]).bytesEquals(b);
+    }
+    
+    private int[] merge(final BytesRefHashView previous) {
+      int pCount = previous.count;
+      final int[] merged = new int[count];
+      final int delta = count - pCount;
+      // ords are continuous - just fill them up and sort the new values in place
+      for (int i = pCount; i < count; i++) {
+        merged[i] = i;
+      }
+      // sort all new entries
+      sort(merged, pCount, count-1);
+      final BytesRef bytes = new BytesRef();
+      final BytesRef spare = new BytesRef();
+      int sourceOffset = 0;
+      int targetOffset = 0;
+      // merge previously sorted values with new values 
+      for (int i = 0; i < delta; i++) {
+        int j = merged[pCount + i];
+        pool.setBytesRef(bytes, bytesStart[j]);
+        final int seekCeil = previous.seekCeil(bytes, spare);
+        assert seekCeil < 0 : "new value already exists in previous hash";
+        int insertAt = (-seekCeil-1);
+        
+        if (sourceOffset == insertAt) {
+          // insert in the same destination
+          merged[targetOffset++] = j;
+        } else {
+          assert insertAt > sourceOffset;
+          final int length = insertAt - sourceOffset;
+          // TODO maybe it's worth to loop if the num of values to merge is small here?
+          System.arraycopy(previous.sortedOrds, sourceOffset, merged, targetOffset, length);
+          targetOffset += length;
+          merged[targetOffset++] = j;
+          sourceOffset = insertAt;
+        }
+      }
+      if (sourceOffset < pCount) { // move the tail over if there is one
+        System.arraycopy(previous.sortedOrds, sourceOffset, merged, targetOffset, pCount - sourceOffset);
+      }
+      return merged;
+    }
+    
+    public int[] initSortedOrds() {
+      final int[] sortedOrds = new int[count];
+      // don't compact only iterate and add missing ordinals
+      for (int i = 0; i < sortedOrds.length; i++) {
+        sortedOrds[i] = i;
+      }
+      return sortedOrds;
+    }
+    
+    
+    private int[] sort(final int[] compact, int start, int end) {
+      new SorterTemplate() {
+        @Override
+        protected void swap(int i, int j) {
+          final int o = compact[i];
+          compact[i] = compact[j];
+          compact[j] = o;
+        }
+        
+        @Override
+        protected int compare(int i, int j) {
+          final int ord1 = compact[i], ord2 = compact[j];
+          assert bytesStart.length > ord1 && bytesStart.length > ord2;
+          return comp.compare(pool.setBytesRef(scratch1, bytesStart[ord1]),
+            pool.setBytesRef(scratch2, bytesStart[ord2]));
+        }
+
+        @Override
+        protected void setPivot(int i) {
+          final int ord = compact[i];
+          assert bytesStart.length > ord;
+          pool.setBytesRef(pivot, bytesStart[ord]);
+        }
+    
+        @Override
+        protected int comparePivot(int j) {
+          final int ord = compact[j];
+          assert bytesStart.length > ord;
+          return comp.compare(pivot,
+            pool.setBytesRef(scratch2, bytesStart[ord]));
+        }
+        
+        private final BytesRef pivot = new BytesRef(),
+          scratch1 = new BytesRef(), scratch2 = new BytesRef();
+      }.quickSort(start, end);
+      return compact;
+    }
+    
+    public int[] sortedOrds() {
+      return sortedOrds;
+    }
+  }
 }
Index: lucene/src/test/org/apache/lucene/util/TestBytesRefHash.java
===================================================================
--- lucene/src/test/org/apache/lucene/util/TestBytesRefHash.java	(revision 1164400)
+++ lucene/src/test/org/apache/lucene/util/TestBytesRefHash.java	(working copy)
@@ -17,16 +17,22 @@
  * limitations under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.Map.Entry;
 
 import org.apache.lucene.util.BytesRefHash.MaxBytesLengthExceededException;
+import org.apache.lucene.util.BytesRefHash.BytesRefHashView;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,7 +43,7 @@
 
   BytesRefHash hash;
   ByteBlockPool pool;
-  
+
   /**
    */
   @Override
@@ -47,16 +53,18 @@
     pool = newPool();
     hash = newHash(pool);
   }
-  
-  private ByteBlockPool newPool(){
-    return  random.nextBoolean() && pool != null ? pool
-        : new ByteBlockPool(new RecyclingByteBlockAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, random.nextInt(25)));
+
+  private ByteBlockPool newPool() {
+    return random.nextBoolean() && pool != null ? pool : new ByteBlockPool(
+        new RecyclingByteBlockAllocator(ByteBlockPool.BYTE_BLOCK_SIZE,
+            random.nextInt(25)));
   }
-  
+
   private BytesRefHash newHash(ByteBlockPool blockPool) {
     final int initSize = 2 << 1 + random.nextInt(5);
-    return random.nextBoolean() ? new BytesRefHash(blockPool) : new BytesRefHash(
-        blockPool, initSize, new BytesRefHash.DirectBytesStartArray(initSize));
+    return random.nextBoolean() ? new BytesRefHash(blockPool)
+        : new BytesRefHash(blockPool, initSize,
+            new BytesRefHash.DirectBytesStartArray(initSize));
   }
 
   /**
@@ -67,7 +75,7 @@
     BytesRef ref = new BytesRef();
     int num = atLeast(2);
     for (int j = 0; j < num; j++) {
-      final int mod = 1+random.nextInt(39);
+      final int mod = 1 + random.nextInt(39);
       for (int i = 0; i < 797; i++) {
         String str;
         do {
@@ -80,7 +88,7 @@
           assertEquals(hash.size(), count);
         else
           assertEquals(hash.size(), count + 1);
-        if(i % mod == 0) {
+        if (i % mod == 0) {
           hash.clear();
           assertEquals(0, hash.size());
           hash.reinit();
@@ -116,7 +124,7 @@
           uniqueCount++;
           assertEquals(hash.size(), count + 1);
         } else {
-          assertTrue((-key)-1 < count);
+          assertTrue((-key) - 1 < count);
           assertEquals(hash.size(), count);
         }
       }
@@ -149,7 +157,7 @@
         ref.copy(str);
         final int key = hash.add(ref);
         if (key < 0) {
-          assertTrue(bits.get((-key)-1));
+          assertTrue(bits.get((-key) - 1));
         } else {
           assertFalse(bits.get(key));
           bits.set(key);
@@ -229,19 +237,19 @@
         int count = hash.size();
         int key = hash.add(ref);
 
-        if (key >=0) {
+        if (key >= 0) {
           assertTrue(strings.add(str));
           assertEquals(uniqueCount, key);
           assertEquals(hash.size(), count + 1);
           uniqueCount++;
         } else {
           assertFalse(strings.add(str));
-          assertTrue((-key)-1 < count);
-          assertEquals(str, hash.get((-key)-1, scratch).utf8ToString());
+          assertTrue((-key) - 1 < count);
+          assertEquals(str, hash.get((-key) - 1, scratch).utf8ToString());
           assertEquals(count, hash.size());
         }
       }
-      
+
       assertAllIn(strings, hash);
       hash.clear();
       assertEquals(0, hash.size());
@@ -268,11 +276,10 @@
       }
     }
   }
-  
+
   /**
    * Test method for
-   * {@link org.apache.lucene.util.BytesRefHash#addByPoolOffset(int)}
-   * .
+   * {@link org.apache.lucene.util.BytesRefHash#addByPoolOffset(int)} .
    */
   @Test
   public void testAddByPoolOffset() {
@@ -302,21 +309,22 @@
           uniqueCount++;
         } else {
           assertFalse(strings.add(str));
-          assertTrue((-key)-1 < count);
-          assertEquals(str, hash.get((-key)-1, scratch).utf8ToString());
+          assertTrue((-key) - 1 < count);
+          assertEquals(str, hash.get((-key) - 1, scratch).utf8ToString());
           assertEquals(count, hash.size());
-          int offsetKey = offsetHash.addByPoolOffset(hash.byteStart((-key)-1));
-          assertTrue((-offsetKey)-1 < count);
-          assertEquals(str, hash.get((-offsetKey)-1, scratch).utf8ToString());
+          int offsetKey = offsetHash
+              .addByPoolOffset(hash.byteStart((-key) - 1));
+          assertTrue((-offsetKey) - 1 < count);
+          assertEquals(str, hash.get((-offsetKey) - 1, scratch).utf8ToString());
           assertEquals(count, hash.size());
         }
       }
-      
+
       assertAllIn(strings, hash);
       for (String string : strings) {
         ref.copy(string);
         int key = hash.add(ref);
-        BytesRef bytesRef = offsetHash.get((-key)-1, scratch);
+        BytesRef bytesRef = offsetHash.get((-key) - 1, scratch);
         assertEquals(ref, bytesRef);
       }
 
@@ -328,20 +336,291 @@
       offsetHash.reinit();
     }
   }
-  
+
   private void assertAllIn(Set<String> strings, BytesRefHash hash) {
     BytesRef ref = new BytesRef();
     BytesRef scratch = new BytesRef();
     int count = hash.size();
     for (String string : strings) {
       ref.copy(string);
-      int key  =  hash.add(ref); // add again to check duplicates
-      assertEquals(string, hash.get((-key)-1, scratch).utf8ToString());
+      int key = hash.add(ref); // add again to check duplicates
+      assertEquals(string, hash.get((-key) - 1, scratch).utf8ToString());
       assertEquals(count, hash.size());
       assertTrue("key: " + key + " count: " + count + " string: " + string,
           key < count);
     }
   }
 
+  public void testViewSeekExact() {
+    BytesRef ref = new BytesRef();
+    BytesRef scratch = new BytesRef();
+    int num = atLeast(2);
+    for (int j = 0; j < num; j++) {
+      Set<String> strings = new HashSet<String>();
+      int uniqueCount = 0;
+      for (int i = 0; i < 797; i++) {
+        String str;
+        do {
+          str = _TestUtil.randomRealisticUnicodeString(random, 1000);
+        } while (str.length() == 0);
+        ref.copy(str);
+        int count = hash.size();
+        int key = hash.add(ref);
 
+        if (key >= 0) {
+          assertTrue(strings.add(str));
+          assertEquals(uniqueCount, key);
+          assertEquals(hash.size(), count + 1);
+          uniqueCount++;
+        } else {
+          assertFalse(strings.add(str));
+          assertTrue((-key) - 1 < count);
+          assertEquals(str, hash.get((-key) - 1, scratch).utf8ToString());
+          assertEquals(count, hash.size());
+        }
+      }
+
+      Comparator<BytesRef> comp = BytesRef.getUTF8SortedAsUnicodeComparator();
+      BytesRefHashView readOnly = hash.getView(comp, false);
+      int count = 0;
+      for (String string : strings) {
+        ref.copy(string);
+        assertTrue(readOnly.seekExact(ref, scratch));
+        assertEquals(scratch, ref);
+        assertEquals(string, ref.utf8ToString());
+        count++;
+      }
+      hash.clear();
+      assertEquals(0, hash.size());
+      hash.reinit();
+    }
+  }
+
+  public void testViewSeekCeil() {
+    BytesRef ref = new BytesRef();
+    BytesRef scratch = new BytesRef();
+    int num = atLeast(2);
+    for (int j = 0; j < num; j++) {
+      Set<BytesRef> strings = new HashSet<BytesRef>();
+      int uniqueCount = 0;
+      for (int i = 0; i < 797; i++) {
+        String str;
+        do {
+          str = _TestUtil.randomRealisticUnicodeString(random, 1000);
+        } while (str.length() == 0);
+        ref.copy(str);
+        int count = hash.size();
+        int key = hash.add(ref);
+
+        if (key >= 0) {
+          assertTrue(strings.add(new BytesRef(str)));
+          assertEquals(uniqueCount, key);
+          assertEquals(hash.size(), count + 1);
+          uniqueCount++;
+        } else {
+          assertFalse(strings.add(new BytesRef(str)));
+          assertTrue((-key) - 1 < count);
+          assertEquals(str, hash.get((-key) - 1, scratch).utf8ToString());
+          assertEquals(count, hash.size());
+        }
+      }
+      Comparator<BytesRef> comp = BytesRef.getUTF8SortedAsUnicodeComparator();
+      BytesRef[] sorted = strings.toArray(new BytesRef[0]);
+      Arrays.sort(sorted, comp);
+
+      BytesRefHashView readOnly = hash.getView(comp, true);
+
+      assertEquals(readOnly.size(), hash.size());
+
+      for (int i = 0; i < sorted.length; i++) {
+        int next = readOnly.next(i - 1, scratch);
+        assertTrue(next >= 0);
+        assertEquals(sorted[i].utf8ToString(), scratch.utf8ToString());
+
+      }
+      for (BytesRef term : strings) {
+        int seekCeil = readOnly.seekCeil(term, scratch);
+        assertTrue(readOnly.seekExact(term, scratch));
+        assertEquals(scratch, term);
+        assertTrue(seekCeil >= 0);
+        assertEquals(scratch, term);
+
+      }
+      int iter = atLeast(100);
+
+      for (int i = 0; i < iter; i++) {
+        ref.copy(_TestUtil.randomRealisticUnicodeString(random, 1000));
+        int seekCeil = readOnly.seekCeil(ref, scratch);
+        if (!readOnly.seekExact(ref, scratch)) {
+          assertTrue(seekCeil < 0);
+        } else {
+          assertTrue(seekCeil >= 0);
+        }
+        int binarySearch = Arrays.binarySearch(sorted, ref, comp);
+        assertEquals(seekCeil, binarySearch);
+      }
+      hash.clear();
+      assertEquals(0, hash.size());
+      hash.reinit();
+    }
+  }
+
+  public void testSingleWriterMultipleReaders() throws Throwable {
+
+    int numThreads = atLeast(2);
+    int numIter = atLeast(40000);
+    WriterThread thread = new WriterThread();
+    ReaderThread[] readers = new ReaderThread[numThreads];
+    for (int i = 0; i < readers.length; i++) {
+      readers[i] = new ReaderThread(thread, numIter, random);
+      readers[i].start();
+    }
+    thread.start();
+
+    thread.join();
+    for (int i = 0; i < readers.length; i++) {
+      readers[i].join();
+    }
+    if (thread.error != null) {
+      throw thread.error;
+    }
+    for (int i = 0; i < readers.length; i++) {
+      if (readers[i].error != null) {
+        throw readers[i].error;
+      }
+    }
+  }
+
+  public void testIncrementallySortedView() {
+    BytesRef ref = new BytesRef();
+    BytesRef scratch = new BytesRef();
+    int num = atLeast(2);
+    Comparator<BytesRef> comp = BytesRef.getUTF8SortedAsUnicodeComparator();
+
+    for (int j = 0; j < num; j++) {
+      TreeSet<BytesRef> strings = new TreeSet<BytesRef>(comp);
+      BytesRefHashView readOnly = null;
+      for (int i = 0; i < 797; i++) {
+        String str;
+        do {
+          str = _TestUtil.randomRealisticUnicodeString(random, 1000);
+        } while (str.length() == 0);
+        ref.copy(str);
+        hash.add(ref);
+        strings.add(new BytesRef(str));
+
+        if (random.nextInt(20) == 0) {
+          readOnly = hash.getView(comp, readOnly, true);
+          Iterator<BytesRef> iterator = strings.iterator();
+          for (int k = 0; k < strings.size(); k++) {
+            int next = readOnly.next(k - 1, scratch);
+            assertTrue("" + k + " i: " + i + " next: " + next, next >= 0);
+            assertTrue(iterator.hasNext());
+            String utf8ToString = iterator.next().utf8ToString();
+            if (!utf8ToString.equals(scratch.utf8ToString())) {
+              System.out.println();
+            }
+            assertEquals("" + k + " i: " + i, utf8ToString,
+                scratch.utf8ToString());
+          }
+          assertFalse(iterator.hasNext());
+        }
+      }
+
+      hash.clear();
+      assertEquals(0, hash.size());
+      hash.reinit();
+
+    }
+  }
+
+  public static class WriterThread extends Thread {
+    BytesRefHash hash = new BytesRefHash();
+    volatile Throwable error;
+    volatile Holder holder = new Holder();
+
+    @Override
+    public void run() {
+      try {
+        Comparator<BytesRef> comp = BytesRef.getUTF8SortedAsUnicodeComparator();
+        BytesRef ref = new BytesRef();
+        ArrayList<String> strings = new ArrayList<String>();
+        for (int i = 0; i < atLeast(20000); i++) {
+          String str;
+          do {
+            str = _TestUtil.randomRealisticUnicodeString(random, 1000);
+          } while (str.length() == 0);
+          ref.copy(str);
+          int add = hash.add(ref);
+          if (add >= 0) {
+            strings.add(str);
+          }
+
+          if (random.nextInt(10) == 0) {
+            Holder newHolder = new Holder();
+            newHolder.view = hash.getView(comp, holder.view, true);
+            newHolder.keys = strings.toArray(new String[0]);
+            assertEquals(newHolder.keys.length, newHolder.view.size());
+            holder = newHolder;
+          }
+        }
+      } catch (Throwable e) {
+        error = e;
+      }
+    }
+  }
+
+  public static class ReaderThread extends Thread {
+    final WriterThread thread;
+    final int numIter;
+    final Random random;
+    BytesRef key = new BytesRef();
+    BytesRef spare = new BytesRef();
+    Throwable error;
+
+    public ReaderThread(WriterThread thread, int numIter, Random random) {
+      this.thread = thread;
+      this.numIter = numIter;
+      this.random = random;
+    }
+
+    @Override
+    public void run() {
+      Comparator<BytesRef> comp = BytesRef.getUTF8SortedAsUnicodeComparator();
+      try {
+        while (thread.holder.keys == null) {
+          Thread.yield();
+          if (thread.error != null) {
+            return;
+          }
+        }
+        for (int i = 0; i < numIter; i++) {
+          Holder holder = thread.holder;
+          int nextInt = random.nextInt(holder.keys.length);
+          key.copy(holder.keys[nextInt]);
+          int seekCeil = holder.view.seekCeil(key, spare);
+          assertTrue(seekCeil >= 0);
+          holder = thread.holder;
+          assertTrue(holder.view.seekExact(key, spare));
+          // seek again to make sure we don't get a new holder since we got the ord
+          seekCeil = holder.view.seekCeil(key, spare);
+          int randomNext = random.nextInt(100);
+          for (int j = 0; j < randomNext; j++) {
+            int next = holder.view.next(seekCeil+j, spare);
+            if (next != -1) {
+              assertTrue(comp.compare(key, spare) < 0);
+            }
+          }
+        }
+      } catch (Throwable e) {
+        this.error = e;
+      }
+    }
+
+  }
+
+  public static class Holder {
+    public BytesRefHashView view;
+    public String[] keys;
+  }
 }
