Index: modules/grouping/src/java/org/apache/lucene/search/grouping/DistinctCountCollector.java
===================================================================
--- modules/grouping/src/java/org/apache/lucene/search/grouping/DistinctCountCollector.java	(revision )
+++ modules/grouping/src/java/org/apache/lucene/search/grouping/DistinctCountCollector.java	(revision )
@@ -0,0 +1,143 @@
+package org.apache.lucene.search.grouping;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.FieldCache;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.util.BytesRef;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * @lucene.experimental
+ */
+public class DistinctCountCollector extends Collector {
+
+  private final String groupField;
+  private final String countField;
+  private final List<GroupCount> groups;
+  private final SentinelIntSet ordSet;
+  private final GroupCount groupCounts[];
+  private final BytesRef spare = new BytesRef();
+
+  private FieldCache.DocTermsIndex groupFieldTermIndex;
+  private FieldCache.DocTermsIndex countFieldTermIndex;
+//  private GroupCount segmentGroupCounts[];
+
+  public DistinctCountCollector(String groupField, String countField, Collection<SearchGroup<BytesRef>> groups) {
+    this.groupField = groupField;
+    this.countField = countField;
+    this.groups = new ArrayList<GroupCount>(groups.size());
+    for (SearchGroup<BytesRef> group : groups) {
+      this.groups.add(new GroupCount(group.groupValue));
+    }
+    ordSet = new SentinelIntSet(groups.size(), -1);
+    groupCounts = new GroupCount[ordSet.keys.length];
+  }
+
+  public void setScorer(Scorer scorer) throws IOException {
+  }
+
+  public void collect(int doc) throws IOException {
+    int slot = ordSet.find(groupFieldTermIndex.getOrd(doc));
+    if (slot < 0) {
+      return;
+    }
+
+    GroupCount gc = groupCounts[slot];
+
+    /*GroupCount gc = segmentGroupCounts[groupFieldTermIndex.getOrd(doc)];
+    if (gc == null) {
+      return;
+    }*/
+
+    int countOrd = countFieldTermIndex.getOrd(doc);
+    if (doesNotContainsOrd(countOrd, gc.ords)) {
+      if (countOrd == 0) {
+        gc.uniqueValues.add(null);
+      } else {
+        gc.uniqueValues.add(countFieldTermIndex.lookup(countOrd, new BytesRef()));
+      }
+
+      gc.ords = Arrays.copyOf(gc.ords, gc.ords.length + 1);
+      gc.ords[gc.ords.length - 1] = countOrd;
+      if (gc.ords.length > 1) {
+        Arrays.sort(gc.ords);
+      }
+    }
+  }
+
+  private boolean doesNotContainsOrd(int ord, int[] ords) {
+    if (ords.length == 0) {
+      return true;
+    } else if (ords.length == 1) {
+      return ord != ords[0];
+    }
+    return Arrays.binarySearch(ords, ord) < 0;
+  }
+
+  public List<GroupCount> getGroups() {
+    return groups;
+  }
+
+  public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
+    groupFieldTermIndex = FieldCache.DEFAULT.getTermsIndex(context.reader, groupField);
+    countFieldTermIndex = FieldCache.DEFAULT.getTermsIndex(context.reader, countField);
+
+    ordSet.clear();
+//    segmentGroupCounts = new GroupCount[groupFieldTermIndex.numOrd()];
+    for (GroupCount group : groups) {
+      int groupOrd = group.groupValue == null ? 0 : groupFieldTermIndex.binarySearchLookup(group.groupValue, spare);
+      if (groupOrd < 0) {
+        continue;
+      }
+
+      groupCounts[ordSet.put(groupOrd)] = group;
+//      segmentGroupCounts[groupOrd] = group;
+      group.ords = new int[group.uniqueValues.size()];
+      int i = 0;
+      for (BytesRef value : group.uniqueValues) {
+        int countOrd = value == null ? 0 : countFieldTermIndex.binarySearchLookup(value, spare);
+        if (countOrd >= 0) {
+          group.ords[i++] = countOrd;
+        }
+      }
+    }
+  }
+
+  public boolean acceptsDocsOutOfOrder() {
+    return true;
+  }
+
+  public class GroupCount {
+
+    public final BytesRef groupValue;
+    public final SortedSet<BytesRef> uniqueValues;
+
+    int[] ords;
+
+    public GroupCount(BytesRef groupValue) {
+      this.groupValue = groupValue;
+      uniqueValues = new TreeSet<BytesRef>();
+    }
+  }
+
+}
Index: modules/grouping/src/test/org/apache/lucene/search/grouping/DistinctCountCollectorTest.java
===================================================================
--- modules/grouping/src/test/org/apache/lucene/search/grouping/DistinctCountCollectorTest.java	(revision )
+++ modules/grouping/src/test/org/apache/lucene/search/grouping/DistinctCountCollectorTest.java	(revision )
@@ -0,0 +1,190 @@
+package org.apache.lucene.search.grouping;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DistinctCountCollectorTest extends LuceneTestCase {
+
+  public void testBasic() throws Exception {
+
+    final String groupField = "author";
+    final String countField = "publisher";
+    FieldType customType = new FieldType();
+    customType.setStored(true);
+
+    Directory dir = newDirectory();
+    RandomIndexWriter w = new RandomIndexWriter(
+                               random,
+                               dir,
+                               newIndexWriterConfig(TEST_VERSION_CURRENT,
+                                                    new MockAnalyzer(random)).setMergePolicy(newLogMergePolicy()));
+    // 0
+    Document doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author1"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "random text"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "a"));
+    doc.add(new Field("id", customType, "1"));
+    w.addDocument(doc);
+
+    // 1
+    doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author1"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "some more random text blob"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "a"));
+    doc.add(new Field("id", customType, "2"));
+    w.addDocument(doc);
+
+    // 2
+    doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author1"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "some more random textual data"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "b"));
+    doc.add(new Field("id", customType, "3"));
+    w.addDocument(doc);
+    w.commit(); // To ensure a second segment
+
+    // 3
+    doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author2"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "some random text"));
+    doc.add(new Field("id", customType, "4"));
+    w.addDocument(doc);
+
+    // 4
+    doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author3"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "some more random text"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "a"));
+    doc.add(new Field("id", customType, "5"));
+    w.addDocument(doc);
+
+    // 5
+    doc = new Document();
+    doc.add(new Field(groupField, TextField.TYPE_STORED, "author3"));
+    doc.add(new Field("content", TextField.TYPE_STORED, "random blob"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "a"));
+    doc.add(new Field("id", customType, "6"));
+    w.addDocument(doc);
+
+    // 6 -- no author field
+    doc = new Document();
+    doc.add(new Field("content", TextField.TYPE_STORED, "random word stuck in alot of other text"));
+    doc.add(new Field(countField, TextField.TYPE_STORED, "b"));
+    doc.add(new Field("id", customType, "6"));
+    w.addDocument(doc);
+
+    IndexSearcher indexSearcher = new IndexSearcher(w.getReader());
+    w.close();
+
+
+    // === Search for content:random
+    TermFirstPassGroupingCollector firstCollector = new TermFirstPassGroupingCollector(groupField, new Sort(), 10);
+    indexSearcher.search(new TermQuery(new Term("content", "random")), firstCollector);
+    DistinctCountCollector distinctCountCollector = new DistinctCountCollector(groupField, countField, firstCollector.getTopGroups(0, false));
+    indexSearcher.search(new TermQuery(new Term("content", "random")), distinctCountCollector);
+
+    List<DistinctCountCollector.GroupCount> gcs = distinctCountCollector.getGroups();
+    assertEquals(4, gcs.size());
+
+    assertEquals("author1", gcs.get(0).groupValue.utf8ToString());
+    List<BytesRef> countValues = new ArrayList<BytesRef>(gcs.get(0).uniqueValues);
+    assertEquals(2, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+    assertEquals("b", countValues.get(1).utf8ToString());
+
+    assertEquals("author3", gcs.get(1).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(1).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+
+    assertEquals("author2", gcs.get(2).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(2).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertNull(countValues.get(0));
+
+    assertNull(gcs.get(3).groupValue);
+    countValues = new ArrayList<BytesRef>(gcs.get(3).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertEquals("b", countValues.get(0).utf8ToString());
+
+    // === Search for content:some
+    firstCollector = new TermFirstPassGroupingCollector(groupField, new Sort(), 10);
+    indexSearcher.search(new TermQuery(new Term("content", "some")), firstCollector);
+    distinctCountCollector = new DistinctCountCollector(groupField, countField, firstCollector.getTopGroups(0, false));
+    indexSearcher.search(new TermQuery(new Term("content", "some")), distinctCountCollector);
+
+    gcs = distinctCountCollector.getGroups();
+    assertEquals(3, gcs.size());
+
+    assertEquals("author2", gcs.get(0).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(0).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertNull(countValues.get(0));
+
+    assertEquals("author3", gcs.get(1).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(1).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+
+    assertEquals("author1", gcs.get(2).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(2).uniqueValues);
+    assertEquals(2, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+    assertEquals("b", countValues.get(1).utf8ToString());
+
+     // === Search for content:blob
+    firstCollector = new TermFirstPassGroupingCollector(groupField, new Sort(), 10);
+    indexSearcher.search(new TermQuery(new Term("content", "blob")), firstCollector);
+    distinctCountCollector = new DistinctCountCollector(groupField, countField, firstCollector.getTopGroups(0, false));
+    indexSearcher.search(new TermQuery(new Term("content", "blob")), distinctCountCollector);
+
+    gcs = distinctCountCollector.getGroups();
+    assertEquals(2, gcs.size());
+
+    assertEquals("author3", gcs.get(0).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(0).uniqueValues);
+    assertEquals(1, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+
+    assertEquals("author1", gcs.get(1).groupValue.utf8ToString());
+    countValues = new ArrayList<BytesRef>(gcs.get(1).uniqueValues);
+    // B/c the only one document matched with blob inside the author 1 group
+    assertEquals(1, countValues.size());
+    assertEquals("a", countValues.get(0).utf8ToString());
+
+
+    indexSearcher.getIndexReader().close();
+    dir.close();
+  }
+}
\ No newline at end of file
