Index: lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesConsumer.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesConsumer.java	(revision 1141101)
+++ lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesConsumer.java	(working copy)
@@ -37,73 +37,104 @@
   private final Directory directory;
   private final AtomicLong bytesUsed;
   private final Comparator<BytesRef> comparator;
-
-  public DefaultDocValuesConsumer(PerDocWriteState state, Comparator<BytesRef> comparator) {
+  private boolean useCompoundFile;
+  public DefaultDocValuesConsumer(PerDocWriteState state, Comparator<BytesRef> comparator, boolean useCompoundFile) throws IOException {
     this.segmentName = state.segmentName;
     this.codecId = state.codecId;
     this.bytesUsed = state.bytesUsed;
-    this.directory = state.directory;
+    //nocommit maybe we should enable a global CFS that all codecs can pull on demand to further reduce the number of files?
+    this.directory = useCompoundFile ? state.directory.createCompoundOutput(IndexFileNames.segmentFileName(segmentName, state.codecId, IndexFileNames.COMPOUND_FILE_EXTENSION)) : state.directory;
     this.comparator = comparator;
+    this.useCompoundFile = useCompoundFile;
   }
+
+  public DefaultDocValuesConsumer(PerDocWriteState state, Comparator<BytesRef> comparator) throws IOException {
+    this(state, comparator, true);
+    //nocommit codecs should be configurable to use compound or not
+  }
   
   public void close() throws IOException {
+    if (useCompoundFile) {
+      this.directory.close();
+    }
   }
 
   @Override
   public DocValuesConsumer addValuesField(FieldInfo field) throws IOException {
     return Writer.create(field.getDocValues(),
         docValuesId(segmentName, codecId, field.number),
-        // TODO can we have a compound file per segment and codec for
-        // docvalues?
         directory, comparator, bytesUsed);
   }
   
-  @SuppressWarnings("fallthrough")
   public static void files(Directory dir, SegmentInfo segmentInfo, int codecId,
-      Set<String> files) throws IOException {
+      Set<String> files, boolean useCompoundFile) throws IOException {
     FieldInfos fieldInfos = segmentInfo.getFieldInfos();
     for (FieldInfo fieldInfo : fieldInfos) {
       if (fieldInfo.getCodecId() == codecId && fieldInfo.hasDocValues()) {
         String filename = docValuesId(segmentInfo.name, codecId,
             fieldInfo.number);
-        switch (fieldInfo.getDocValues()) {
-        case BYTES_FIXED_DEREF:
-        case BYTES_VAR_DEREF:
-        case BYTES_VAR_SORTED:
-        case BYTES_FIXED_SORTED:
-        case BYTES_VAR_STRAIGHT:
-          files.add(IndexFileNames.segmentFileName(filename, "",
-              Writer.INDEX_EXTENSION));
-          assert dir.fileExists(IndexFileNames.segmentFileName(filename, "",
-              Writer.INDEX_EXTENSION));
-          // until here all types use an index
-        case BYTES_FIXED_STRAIGHT:
-        case FLOAT_32:
-        case FLOAT_64:
-        case VAR_INTS:
-        case FIXED_INTS_16:
-        case FIXED_INTS_32:
-        case FIXED_INTS_64:
-        case FIXED_INTS_8:
-          files.add(IndexFileNames.segmentFileName(filename, "",
-              Writer.DATA_EXTENSION));
-          assert dir.fileExists(IndexFileNames.segmentFileName(filename, "",
-              Writer.DATA_EXTENSION));
-          break;
-      
-        default:
-          assert false;
+        if (useCompoundFile) {
+          files.add(IndexFileNames.segmentFileName(segmentInfo.name, codecId, IndexFileNames.COMPOUND_FILE_EXTENSION));
+          files.add(IndexFileNames.segmentFileName(segmentInfo.name, codecId, IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION));
+          assert dir.fileExists(IndexFileNames.segmentFileName(segmentInfo.name, codecId, IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION)); 
+          assert dir.fileExists(IndexFileNames.segmentFileName(segmentInfo.name, codecId, IndexFileNames.COMPOUND_FILE_EXTENSION)); 
+          return;
+        } else {
+          switch (fieldInfo.getDocValues()) {
+          case BYTES_FIXED_DEREF:
+          case BYTES_VAR_DEREF:
+          case BYTES_VAR_SORTED:
+          case BYTES_FIXED_SORTED:
+          case BYTES_VAR_STRAIGHT:
+            files.add(IndexFileNames.segmentFileName(filename, "",
+                Writer.INDEX_EXTENSION));
+            assert dir.fileExists(IndexFileNames.segmentFileName(filename, "",
+                Writer.INDEX_EXTENSION));
+            // until here all types use an index
+          case BYTES_FIXED_STRAIGHT:
+          case FLOAT_32:
+          case FLOAT_64:
+          case VAR_INTS:
+          case FIXED_INTS_16:
+          case FIXED_INTS_32:
+          case FIXED_INTS_64:
+          case FIXED_INTS_8:
+            files.add(IndexFileNames.segmentFileName(filename, "",
+                Writer.DATA_EXTENSION));
+            assert dir.fileExists(IndexFileNames.segmentFileName(filename, "",
+                Writer.DATA_EXTENSION));
+            break;
+        
+          default:
+            assert false;
+          }
         }
       }
     }
   }
   
+  
+  @SuppressWarnings("fallthrough")
+  public static void files(Directory dir, SegmentInfo segmentInfo, int codecId,
+      Set<String> files) throws IOException {
+    files(dir, segmentInfo, codecId, files, true);
+  }
+  
   static String docValuesId(String segmentsName, int codecID, int fieldId) {
     return segmentsName + "_" + codecID + "-" + fieldId;
   }
+  
+  public static void getDocValuesExtensions(Set<String> extensions, boolean useCompoundFile) {
+    if (useCompoundFile) {
+      extensions.add(IndexFileNames.COMPOUND_FILE_ENTRIES_EXTENSION);
+      extensions.add(IndexFileNames.COMPOUND_FILE_EXTENSION);
+    } else {
+      extensions.add(Writer.DATA_EXTENSION);
+      extensions.add(Writer.INDEX_EXTENSION);
+    }
+  }
 
   public static void getDocValuesExtensions(Set<String> extensions) {
-    extensions.add(Writer.DATA_EXTENSION);
-    extensions.add(Writer.INDEX_EXTENSION);
+    getDocValuesExtensions(extensions, true);
   }
 }
Index: lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesProducer.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesProducer.java	(revision 1141101)
+++ lucene/src/java/org/apache/lucene/index/codecs/DefaultDocValuesProducer.java	(working copy)
@@ -16,12 +16,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.TreeMap;
 
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.index.values.Bytes;
 import org.apache.lucene.index.values.IndexDocValues;
@@ -29,6 +32,7 @@
 import org.apache.lucene.index.values.Ints;
 import org.apache.lucene.index.values.ValueType;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.IOUtils;
 
 /**
  * Abstract base class for FieldsProducer implementations supporting
@@ -39,6 +43,8 @@
 public class DefaultDocValuesProducer extends PerDocValues {
 
   protected final TreeMap<String, IndexDocValues> docValues;
+  private final boolean useCompoundFile;
+  private final Closeable cfs;
 
   /**
    * Creates a new {@link DefaultDocValuesProducer} instance and loads all
@@ -57,8 +63,21 @@
    */
   public DefaultDocValuesProducer(SegmentInfo si, Directory dir,
       FieldInfos fieldInfo, int codecId) throws IOException {
-    docValues = load(fieldInfo, si.name, si.docCount, dir, codecId);
+    this(si, dir, fieldInfo, codecId, true);
   }
+  
+  public DefaultDocValuesProducer(SegmentInfo si, Directory dir,
+      FieldInfos fieldInfo, int codecId, boolean useCompoundFile) throws IOException {
+    this.useCompoundFile = useCompoundFile;
+    final Directory directory;
+    if (useCompoundFile) {
+      cfs = directory = dir.openCompoundInput(IndexFileNames.segmentFileName(si.name, codecId, IndexFileNames.COMPOUND_FILE_EXTENSION), 1024);
+    } else {
+      cfs = null;
+      directory = dir;
+    }
+    docValues = load(fieldInfo, si.name, si.docCount, directory, codecId);
+  }
 
   /**
    * Returns a {@link IndexDocValues} instance for the given field name or
@@ -92,7 +111,7 @@
     } finally {
       if (!success) {
         // if we fail we must close all opened resources if there are any
-        closeDocValues(values.values());
+        closeInternal(values.values());
       }
     }
     return values;
@@ -149,22 +168,20 @@
   }
 
   public void close() throws IOException {
-    closeDocValues(docValues.values());
+    closeInternal(docValues.values());
   }
 
-  private void closeDocValues(final Collection<IndexDocValues> values)
-      throws IOException {
-    IOException ex = null;
-    for (IndexDocValues docValues : values) {
-      try {
-        docValues.close();
-      } catch (IOException e) {
-        ex = e;
-      }
-    }
-    if (ex != null) {
-      throw ex;
-    }
+  private void closeInternal(Collection<? extends Closeable> closeables) throws IOException {
+    final Collection<? extends Closeable> toClose;
+    if (useCompoundFile) {
+      final ArrayList<Closeable> list = new ArrayList<Closeable>(closeables);
+      list.add(cfs);
+      toClose = list; 
+    } else {
+      toClose = docValues.values();
+    
+    } 
+    IOUtils.closeSafely(false, toClose);
   }
 
   @Override
Index: lucene/src/java/org/apache/lucene/store/CompoundFileDirectory.java
===================================================================
--- lucene/src/java/org/apache/lucene/store/CompoundFileDirectory.java	(revision 1141101)
+++ lucene/src/java/org/apache/lucene/store/CompoundFileDirectory.java	(working copy)
@@ -25,9 +25,11 @@
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.util.IOUtils;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -60,7 +62,7 @@
    * NOTE: subclasses must call {@link #initForRead(Map)} before the directory can be used.
    */
   public CompoundFileDirectory(Directory directory, String fileName, int readBufferSize) throws IOException {
-    assert !(directory instanceof CompoundFileDirectory) : "compound file inside of compound file: " + fileName;
+
     this.directory = directory;
     this.fileName = fileName;
     this.readBufferSize = readBufferSize;
@@ -75,6 +77,7 @@
   }
   
   protected final void initForWrite() {
+    assert !(directory instanceof CompoundFileDirectory) : "compound file inside of compound file: " + fileName;
     this.entries = SENTINEL;
     this.openForWrite = true;
     this.isOpen = true;
@@ -173,12 +176,33 @@
   
   @Override
   public synchronized void close() throws IOException {
+    if (!isOpen) {
+     assert entries == null; 
+     return; // already closed
+    }
     ensureOpen();
-    entries = null;
-    isOpen = false;
-    if (writer != null) {
-      assert openForWrite;
-      writer.close();
+    boolean success = false;
+    try {
+      if (!nestedCFS.isEmpty()) {
+        assert openForWrite;
+        initWriter();
+        for (CompoundFileDirectory dir : nestedCFS) {
+          if (dir.isOpen) {
+            throw new IllegalStateException("nested CFS is still open: " + dir.fileName);
+          }
+          if (dir.writer != null) {
+            directory.copy(this, dir.writer.dataFileName, dir.writer.dataFileName);
+            directory.copy(this, dir.writer.entryTableName, dir.writer.entryTableName);
+            directory.deleteFile(dir.writer.dataFileName); 
+            directory.deleteFile(dir.writer.entryTableName);
+          }
+        }
+      }
+      isOpen = false;
+      entries = null;
+      success = true;
+    } finally {
+      IOUtils.closeSafely(!success, writer);
     }
   }
   
@@ -285,21 +309,22 @@
     throw new UnsupportedOperationException();
   }
   
-  /** Not implemented
-   * @throws UnsupportedOperationException */
+  /** nocommit javadoc */
   @Override
   public final CompoundFileDirectory openCompoundInput(String name, int bufferSize) throws IOException {
-    // NOTE: final to make nested compounding impossible.
-    throw new UnsupportedOperationException();
+    return new DefaultCompoundFileDirectory(this, name, bufferSize, false);
   }
   
-  /** Not implemented
-  * @throws UnsupportedOperationException */
+  private final List<CompoundFileDirectory> nestedCFS = new ArrayList<CompoundFileDirectory>();
+  /** nocommit javadoc */
   @Override
   public CompoundFileDirectory createCompoundOutput(String name)
       throws IOException {
-    // NOTE: final to make nested compounding impossible.
-    throw new UnsupportedOperationException();
+    // nocommit this is unneeded for now maybe disable this?
+    assert openForWrite;
+    CompoundFileDirectory createCompoundOutput = directory.createCompoundOutput(name);
+    nestedCFS.add(createCompoundOutput);
+    return createCompoundOutput;
   }
   
   private final void initWriter() {
Index: lucene/src/java/org/apache/lucene/store/CompoundFileWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/store/CompoundFileWriter.java	(revision 1141101)
+++ lucene/src/java/org/apache/lucene/store/CompoundFileWriter.java	(working copy)
@@ -17,6 +17,7 @@
  * limitations under the License.
  */
 
+import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
@@ -55,7 +56,7 @@
  * 
  * @lucene.internal
  */
-final class CompoundFileWriter {
+final class CompoundFileWriter implements Closeable{
 
   private static final class FileEntry {
     /** source file */
@@ -89,8 +90,8 @@
   private boolean closed = false;
   private volatile IndexOutput dataOut;
   private final AtomicBoolean outputTaken = new AtomicBoolean(false);
-  private final String entryTableName;
-  private final String dataFileName;
+  final String entryTableName;
+  final String dataFileName;
 
   /**
    * Create the compound stream in the specified file. The file name is the
@@ -128,7 +129,7 @@
    *           if close() had been called before or if no file has been added to
    *           this object
    */
-  void close() throws IOException {
+  public void close() throws IOException {
     if (closed) {
       throw new IllegalStateException("already closed");
     }
@@ -147,12 +148,18 @@
       assert dataOut != null;
       long finalLength = dataOut.getFilePointer();
       assert assertFileLength(finalLength, dataOut);
+    } catch (IOException e) {
+      priorException = e;
+    } finally {
+      IOUtils.closeSafely(priorException, dataOut);
+    }
+    try {
       entryTableOut = directory.createOutput(entryTableName);
       writeEntryTable(entries.values(), entryTableOut);
     } catch (IOException e) {
       priorException = e;
     } finally {
-      IOUtils.closeSafely(priorException, dataOut, entryTableOut);
+      IOUtils.closeSafely(priorException, entryTableOut);
     }
   }
 
@@ -321,6 +328,7 @@
         closed = true;
         entry.length = writtenBytes;
         if (isSeparate) {
+          delegate.close();
           // we are a separate file - push into the pending entries
           pendingEntries.add(entry);
         } else {
Index: lucene/src/test/org/apache/lucene/index/TestCompoundFile.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestCompoundFile.java	(revision 1141101)
+++ lucene/src/test/org/apache/lucene/index/TestCompoundFile.java	(working copy)
@@ -717,5 +717,33 @@
     cfr.close();
     newDir.close();
   }
+  
+  public void testNestedCFP() throws IOException {
+    Directory newDir = newDirectory();
+    CompoundFileDirectory csw = newDir.createCompoundOutput("d.cfs");
+    CompoundFileDirectory nested = csw.createCompoundOutput("d.ncfs");
+    IndexOutput out = nested.createOutput("d.xyz");
+    IndexOutput out1 = nested.createOutput("d_1.xyz");
+    out.writeInt(0);
+    out1.writeInt(1);
+    out.close();
+    out1.close();
+    nested.close();
+    csw.close();
+    assertEquals(2, newDir.listAll().length);
+    csw = newDir.openCompoundInput("d.cfs", 1024);
+    assertEquals(2, csw.listAll().length);
+    nested = csw.openCompoundInput("d.ncfs", 1024);
+    assertEquals(2, nested.listAll().length);
+    IndexInput openInput = nested.openInput("d.xyz");
+    assertEquals(0, openInput.readInt());
+    openInput.close();
+    openInput = nested.openInput("d_1.xyz");
+    assertEquals(1, openInput.readInt());
+    openInput.close();
+    nested.close();
+    csw.close();
+    newDir.close();
+  }
 
 }
