diff --git a/lucene/src/java/org/apache/lucene/index/FieldInfo.java b/lucene/src/java/org/apache/lucene/index/FieldInfo.java
index d02b85f..7507f69 100644
--- a/lucene/src/java/org/apache/lucene/index/FieldInfo.java
+++ b/lucene/src/java/org/apache/lucene/index/FieldInfo.java
@@ -86,7 +86,7 @@ public final class FieldInfo {
   public int getCodecId() {
     return codecId;
   }
-
+  
   @Override
   public Object clone() {
     FieldInfo clone = new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
@@ -132,6 +132,12 @@ public final class FieldInfo {
     }
   }
   
+  public void resetDocValues(ValueType v) {
+    if (docValues != null) {
+      docValues = v;
+    }
+  }
+  
   public boolean hasDocValues() {
     return docValues != null;
   }
diff --git a/lucene/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
index 502ef1f..a5d4daa 100644
--- a/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
+++ b/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
@@ -33,7 +33,6 @@ import org.apache.lucene.index.codecs.FieldsReader;
 import org.apache.lucene.index.codecs.FieldsWriter;
 import org.apache.lucene.index.codecs.MergeState;
 import org.apache.lucene.index.codecs.PerDocConsumer;
-import org.apache.lucene.index.codecs.PerDocValues;
 import org.apache.lucene.store.CompoundFileDirectory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -141,6 +140,8 @@ final class SegmentMerger {
     if (fieldInfos.hasVectors()) {
       mergeVectors();
     }
+    // write FIS once merge is done. IDV might change types or drops fields
+    fieldInfos.write(directory, segment + "." + IndexFileNames.FIELD_INFOS_EXTENSION);
     return mergedDocs;
   }
 
@@ -254,7 +255,6 @@ final class SegmentMerger {
       }
     }
     final SegmentCodecs codecInfo = fieldInfos.buildSegmentCodecs(false);
-    fieldInfos.write(directory, segment + "." + IndexFileNames.FIELD_INFOS_EXTENSION);
 
     int docCount = 0;
 
@@ -584,28 +584,11 @@ final class SegmentMerger {
   }
 
   private void mergePerDoc() throws IOException {
-    final List<PerDocValues> perDocProducers = new ArrayList<PerDocValues>();    
-    final List<ReaderUtil.Slice> perDocSlices = new ArrayList<ReaderUtil.Slice>();
-    int docBase = 0;
-    for (MergeState.IndexReaderAndLiveDocs r : readers) {
-      final int maxDoc = r.reader.maxDoc();
-      final PerDocValues producer = r.reader.perDocValues();
-      if (producer != null) {
-        perDocSlices.add(new ReaderUtil.Slice(docBase, maxDoc, perDocProducers
-            .size()));
-        perDocProducers.add(producer);
-      }
-      docBase += maxDoc;
-    }
-    if (!perDocSlices.isEmpty()) {
       final PerDocConsumer docsConsumer = codec
           .docsConsumer(new PerDocWriteState(segmentWriteState));
       boolean success = false;
       try {
-        final MultiPerDocValues multiPerDocValues = new MultiPerDocValues(
-            perDocProducers.toArray(PerDocValues.EMPTY_ARRAY),
-            perDocSlices.toArray(ReaderUtil.Slice.EMPTY_ARRAY));
-        docsConsumer.merge(mergeState, multiPerDocValues);
+        docsConsumer.merge(mergeState);
         success = true;
       } finally {
         if (success) {
@@ -614,11 +597,8 @@ final class SegmentMerger {
           IOUtils.closeWhileHandlingException(docsConsumer);
         }
       }
-    }
-    /* don't close the perDocProducers here since they are private segment producers
-     * and will be closed once the SegmentReader goes out of scope */ 
   }
-
+  
   private MergeState mergeState;
 
   public boolean getAnyNonBulkMerges() {
diff --git a/lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java b/lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java
index 0045ee3..e24a65b 100644
--- a/lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java
+++ b/lucene/src/java/org/apache/lucene/index/codecs/DocValuesConsumer.java
@@ -92,48 +92,38 @@ public abstract class DocValuesConsumer {
    * 
    * @param mergeState
    *          the state to merge
-   * @param values
-   *          the docValues to merge in
+   * @param docValues docValues array containing one instance per reader (
+   *          {@link MergeState#readers}) or <code>null</code> if the reader has
+   *          no {@link IndexDocValues} instance.
    * @throws IOException
    *           if an {@link IOException} occurs
    */
-  public void merge(org.apache.lucene.index.codecs.MergeState mergeState,
-      IndexDocValues values) throws IOException {
+  public void merge(MergeState mergeState, IndexDocValues[] docValues) throws IOException {
     assert mergeState != null;
-    // TODO we need some kind of compatibility notation for values such
-    // that two slightly different segments can be merged eg. fixed vs.
-    // variable byte len or float32 vs. float64
-    boolean merged = false;
-    /*
-     * We ignore the given DocValues here and merge from the subReaders directly
-     * to support bulk copies on the DocValues Writer level. if this gets merged
-     * with MultiDocValues the writer can not optimize for bulk-copyable data
-     */
+    boolean hasMerged = false;
     for(int readerIDX=0;readerIDX<mergeState.readers.size();readerIDX++) {
       final org.apache.lucene.index.codecs.MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(readerIDX);
-      final IndexDocValues r = reader.reader.docValues(mergeState.fieldInfo.name);
-      if (r != null) {
-        merged = true;
-        merge(new Writer.MergeState(r, mergeState.docBase[readerIDX], reader.reader.maxDoc(),
+      if (docValues[readerIDX] != null) {
+        hasMerged = true;
+        merge(new Writer.SingleSubMergeState(docValues[readerIDX], mergeState.docBase[readerIDX], reader.reader.maxDoc(),
                                     reader.liveDocs));
       }
     }
-    if (merged) {
+    // only finish if no exception is thrown!
+    if (hasMerged) {
       finish(mergeState.mergedDocCount);
     }
   }
 
   /**
-   * Merges the given {@link MergeState} into this {@link DocValuesConsumer}.
-   * {@link MergeState#docBase} must always be increasing. Merging segments out
-   * of order is not supported.
+   * Merges the given {@link SingleSubMergeState} into this {@link DocValuesConsumer}.
    * 
    * @param mergeState
-   *          the {@link MergeState} to merge
+   *          the {@link SingleSubMergeState} to merge
    * @throws IOException
    *           if an {@link IOException} occurs
    */
-  protected abstract void merge(MergeState mergeState) throws IOException;
+  protected abstract void merge(SingleSubMergeState mergeState) throws IOException;
 
   /**
    * Specialized auxiliary MergeState is necessary since we don't want to
@@ -141,7 +131,7 @@ public abstract class DocValuesConsumer {
    * created for each merged low level {@link IndexReader} we are merging to
    * support low level bulk copies.
    */
-  public static class MergeState {
+  public static class SingleSubMergeState {
     /**
      * the source reader for this MergeState - merged values should be read from
      * this instance
@@ -154,7 +144,7 @@ public abstract class DocValuesConsumer {
     /** the not deleted bits for this MergeState */
     public final Bits liveDocs;
 
-    public MergeState(IndexDocValues reader, int docBase, int docCount, Bits liveDocs) {
+    public SingleSubMergeState(IndexDocValues reader, int docBase, int docCount, Bits liveDocs) {
       assert reader != null;
       this.reader = reader;
       this.docBase = docBase;
diff --git a/lucene/src/java/org/apache/lucene/index/codecs/PerDocConsumer.java b/lucene/src/java/org/apache/lucene/index/codecs/PerDocConsumer.java
index f765654..869a975 100644
--- a/lucene/src/java/org/apache/lucene/index/codecs/PerDocConsumer.java
+++ b/lucene/src/java/org/apache/lucene/index/codecs/PerDocConsumer.java
@@ -19,7 +19,10 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.values.IndexDocValues;
+import org.apache.lucene.index.values.TypePromoter;
+import org.apache.lucene.index.values.ValueType;
 
 /**
  * Abstract API that consumes per document values. Concrete implementations of
@@ -40,28 +43,75 @@ public abstract class PerDocConsumer implements Closeable{
    * Consumes and merges the given {@link PerDocValues} producer
    * into this consumers format.   
    */
-  public void merge(MergeState mergeState, PerDocValues producer)
+  public void merge(MergeState mergeState)
       throws IOException {
-    Iterable<String> fields = producer.fields();
-    for (String field : fields) {
-      mergeState.fieldInfo = mergeState.fieldInfos.fieldInfo(field);
-      assert mergeState.fieldInfo != null : "FieldInfo for field is null: "
-          + field;
-      if (mergeState.fieldInfo.hasDocValues()) {
-        final IndexDocValues docValues = producer.docValues(field);
-        if (docValues == null) {
-          /*
-           * It is actually possible that a fieldInfo has a values type but no
-           * values are actually available. this can happen if there are already
-           * segments without values around.
-           */
+    final FieldInfos fieldInfos = mergeState.fieldInfos;
+    final IndexDocValues[] docValues = new IndexDocValues[mergeState.readers.size()];
+    final PerDocValues[] perDocValues = new PerDocValues[mergeState.readers.size()];
+    // pull all PerDocValues 
+    for (int i = 0; i < perDocValues.length; i++) {
+      perDocValues[i] =  mergeState.readers.get(i).reader.perDocValues();
+    }
+    for (FieldInfo fieldInfo : fieldInfos) {
+      mergeState.fieldInfo = fieldInfo;
+      TypePromoter currentPromoter = TypePromoter.getIdentityPromoter();
+      if (fieldInfo.hasDocValues()) {
+        for (int i = 0; i < perDocValues.length; i++) {
+          if (perDocValues[i] != null) { // get all IDV to merge
+            docValues[i] = perDocValues[i].docValues(fieldInfo.name);
+            if (docValues[i] != null) {
+              currentPromoter = promoteValueType(fieldInfo, docValues[i], currentPromoter);
+              if (currentPromoter == null) {
+                break;
+              }     
+            }
+          }
+        }
+        
+        if (currentPromoter == null) {
+          fieldInfo.resetDocValues(null);
           continue;
         }
+        assert currentPromoter != TypePromoter.getIdentityPromoter();
+        if (fieldInfo.getDocValues() != currentPromoter.type()) {
+          // reset the type if we got promoted
+          fieldInfo.resetDocValues(currentPromoter.type());
+        }
+        
         final DocValuesConsumer docValuesConsumer = addValuesField(mergeState.fieldInfo);
         assert docValuesConsumer != null;
         docValuesConsumer.merge(mergeState, docValues);
       }
     }
+    /* NOTE: don't close the perDocProducers here since they are private segment producers
+     * and will be closed once the SegmentReader goes out of scope */ 
+  }
 
+  protected TypePromoter promoteValueType(final FieldInfo fieldInfo, final IndexDocValues docValues,
+      TypePromoter currentPromoter) {
+    assert currentPromoter != null;
+    final TypePromoter incomingPromoter = TypePromoter.create(docValues.type(),  docValues.getValueSize());
+    assert incomingPromoter != null;
+    final TypePromoter newPromoter = currentPromoter.promote(incomingPromoter);
+    return newPromoter == null ? handleIncompatibleValueType(fieldInfo, incomingPromoter, currentPromoter) : newPromoter;    
+  }
+
+  /**
+   * Resolves a conflicts of incompatible {@link TypePromoter}s. The default
+   * implementation promotes incompatible types to
+   * {@link ValueType#BYTES_VAR_STRAIGHT} and preserves all values. If this
+   * method returns <code>null</code> all docvalues for the given
+   * {@link FieldInfo} are dropped and all values are lost.
+   * 
+   * @param incomingPromoter
+   *          the incompatible incoming promoter
+   * @param currentPromoter
+   *          the current promoter
+   * @return a promoted {@link TypePromoter} or <code>null</code> iff this index
+   *         docvalues should be dropped for this field.
+   */
+  protected TypePromoter handleIncompatibleValueType(FieldInfo fieldInfo, TypePromoter incomingPromoter, TypePromoter currentPromoter) {
+    return TypePromoter.create(ValueType.BYTES_VAR_STRAIGHT, TypePromoter.VAR_TYPE_VALUE_SIZE);
   }
+  
 }
diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
index c2ae3d6..b868983 100644
--- a/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
+++ b/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
@@ -86,6 +86,12 @@ class FixedDerefBytesImpl {
         throws IOException {
       return new DirectFixedDerefSource(cloneData(), cloneIndex(), size, type());
     }
+
+    @Override
+    public int getValueSize() {
+      return size;
+    }
+    
   }
   
   static final class FixedDerefSource extends BytesSourceBase {
diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
index 54500e4..45b2aed 100644
--- a/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
+++ b/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
@@ -104,6 +104,11 @@ class FixedSortedBytesImpl {
       return new DirectFixedSortedSource(cloneData(), cloneIndex(), size,
           valueCount, comparator, type);
     }
+    
+    @Override
+    public int getValueSize() {
+      return size;
+    }
   }
 
   static final class FixedSortedSource extends BytesSortedSourceBase {
diff --git a/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
index 5a17a87..b19fab4 100644
--- a/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
+++ b/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
@@ -59,6 +59,7 @@ class FixedStraightBytesImpl {
         int version, Counter bytesUsed, IOContext context) throws IOException {
       super(dir, id, codecName, version, bytesUsed, context);
       pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
+      pool.nextBuffer();
     }
     
     @Override
@@ -70,7 +71,6 @@ class FixedStraightBytesImpl {
           throw new IllegalArgumentException("bytes arrays > " + Short.MAX_VALUE + " are not supported");
         }
         size = bytes.length;
-        pool.nextBuffer();
       } else if (bytes.length != size) {
         throw new IllegalArgumentException("expected bytes size=" + size
             + " but got " + bytes.length);
@@ -120,7 +120,7 @@ class FixedStraightBytesImpl {
   }
 
   static class Writer extends FixedBytesWriterBase {
-    private boolean merge;
+    private boolean hasMerged;
     private IndexOutput datOut;
     
     public Writer(Directory dir, String id, Counter bytesUsed, IOContext context) throws IOException {
@@ -133,12 +133,15 @@ class FixedStraightBytesImpl {
 
 
     @Override
-    protected void merge(MergeState state) throws IOException {
-      merge = true;
+    protected void merge(SingleSubMergeState state) throws IOException {
       datOut = getOrCreateDataOut();
       boolean success = false;
       try {
-        if (state.liveDocs == null && state.reader instanceof FixedStraightReader ) {
+        if (!hasMerged && size != -1) {
+          datOut.writeInt(size);
+        }
+
+        if (state.liveDocs == null && tryBulkMerge(state.reader)) {
           FixedStraightReader reader = (FixedStraightReader) state.reader;
           final int maxDocs = reader.maxDoc;
           if (maxDocs == 0) {
@@ -172,24 +175,33 @@ class FixedStraightBytesImpl {
         if (!success) {
           IOUtils.closeWhileHandlingException(datOut);
         }
+        hasMerged = true;
       }
     }
     
+    protected boolean tryBulkMerge(IndexDocValues docValues) {
+      return docValues instanceof FixedStraightReader;
+    }
+    
     @Override
     protected void mergeDoc(int docID, int sourceDoc) throws IOException {
       assert lastDocID < docID;
-      currentMergeSource.getBytes(sourceDoc, bytesRef);
+      setMergeBytes(sourceDoc);
       if (size == -1) {
         size = bytesRef.length;
         datOut.writeInt(size);
       }
-      assert size == bytesRef.length;
+      assert size == bytesRef.length : "size: " + size + " ref: " + bytesRef.length;
       if (lastDocID+1 < docID) {
         fill(datOut, docID);
       }
       datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
       lastDocID = docID;
     }
+    
+    protected void setMergeBytes(int sourceDoc) {
+      currentMergeSource.getBytes(sourceDoc, bytesRef);
+    }
 
 
 
@@ -203,7 +215,7 @@ class FixedStraightBytesImpl {
     public void finish(int docCount) throws IOException {
       boolean success = false;
       try {
-        if (!merge) {
+        if (!hasMerged) {
           // indexing path - no disk IO until here
           assert datOut == null;
           datOut = getOrCreateDataOut();
@@ -267,6 +279,11 @@ class FixedStraightBytesImpl {
     public Source getDirectSource() throws IOException {
       return new DirectFixedStraightSource(cloneData(), size, type());
     }
+    
+    @Override
+    public int getValueSize() {
+      return size;
+    }
   }
   
   // specialized version for single bytes
diff --git a/lucene/src/java/org/apache/lucene/index/values/Floats.java b/lucene/src/java/org/apache/lucene/index/values/Floats.java
index 5b8a773..fee1364 100644
--- a/lucene/src/java/org/apache/lucene/index/values/Floats.java
+++ b/lucene/src/java/org/apache/lucene/index/values/Floats.java
@@ -85,6 +85,18 @@ public class Floats {
     public void add(int docID, PerDocFieldValues docValues) throws IOException {
       add(docID, docValues.getFloat());
     }
+    
+    @Override
+    protected boolean tryBulkMerge(IndexDocValues docValues) {
+      // only bulk merge if value type is the same otherwise size differs
+      return super.tryBulkMerge(docValues) && docValues.type() == template.type();
+    }
+    
+    @Override
+    protected void setMergeBytes(int sourceDoc) {
+      final double value = currentMergeSource.getFloat(sourceDoc);
+      template.toBytes(value, bytesRef);
+    }
   }
   
   final static class FloatsReader extends FixedStraightBytesImpl.FixedStraightReader {
diff --git a/lucene/src/java/org/apache/lucene/index/values/IndexDocValues.java b/lucene/src/java/org/apache/lucene/index/values/IndexDocValues.java
index 6261d9d..41bca30 100644
--- a/lucene/src/java/org/apache/lucene/index/values/IndexDocValues.java
+++ b/lucene/src/java/org/apache/lucene/index/values/IndexDocValues.java
@@ -107,6 +107,17 @@ public abstract class IndexDocValues implements Closeable {
   }
 
   /**
+   * Returns the size per value in bytes or <code>-1</code> iff size per value
+   * is variable.
+   * 
+   * @return the size per value in bytes or <code>-1</code> iff size per value
+   * is variable.
+   */
+  public int getValueSize() {
+    return -1;
+  }
+
+  /**
    * Sets the {@link SourceCache} used by this {@link IndexDocValues} instance. This
    * method should be called before {@link #load()} is called. All {@link Source} instances in the currently used cache will be closed
    * before the new cache is installed.
diff --git a/lucene/src/java/org/apache/lucene/index/values/Ints.java b/lucene/src/java/org/apache/lucene/index/values/Ints.java
index 72ab70a..d8fc6fb 100644
--- a/lucene/src/java/org/apache/lucene/index/values/Ints.java
+++ b/lucene/src/java/org/apache/lucene/index/values/Ints.java
@@ -93,9 +93,9 @@ public final class Ints {
     protected IntsWriter(Directory dir, String id, String codecName,
         int version, Counter bytesUsed, IOContext context, ValueType valueType) throws IOException {
       super(dir, id, codecName, version, bytesUsed, context);
-      final int expectedSize = typeToSize(valueType);
-      this.bytesRef = new BytesRef(expectedSize);
-      bytesRef.length = expectedSize;
+      size = typeToSize(valueType);
+      this.bytesRef = new BytesRef(size);
+      bytesRef.length = size;
       template = IndexDocValuesArray.TEMPLATES.get(valueType);
     }
     
@@ -109,6 +109,18 @@ public final class Ints {
     public void add(int docID, PerDocFieldValues docValues) throws IOException {
       add(docID, docValues.getInt());
     }
+    
+    @Override
+    protected void setMergeBytes(int sourceDoc) {
+      final long value = currentMergeSource.getInt(sourceDoc);
+      template.toBytes(value, bytesRef);
+    }
+    
+    @Override
+    protected boolean tryBulkMerge(IndexDocValues docValues) {
+      // only bulk merge if value type is the same otherwise size differs
+      return super.tryBulkMerge(docValues) && docValues.type() == template.type();
+    }
   }
   
   final static class IntsReader extends FixedStraightBytesImpl.FixedStraightReader {
diff --git a/lucene/src/java/org/apache/lucene/index/values/MultiIndexDocValues.java b/lucene/src/java/org/apache/lucene/index/values/MultiIndexDocValues.java
index de01927..64d20af 100644
--- a/lucene/src/java/org/apache/lucene/index/values/MultiIndexDocValues.java
+++ b/lucene/src/java/org/apache/lucene/index/values/MultiIndexDocValues.java
@@ -46,6 +46,8 @@ public class MultiIndexDocValues extends IndexDocValues {
 
   private DocValuesIndex[] docValuesIdx;
   private int[] starts;
+  private ValueType type;
+  private int valueSize;
 
   public MultiIndexDocValues() {
     starts = new int[0];
@@ -62,10 +64,23 @@ public class MultiIndexDocValues extends IndexDocValues {
   }
 
   public IndexDocValues reset(DocValuesIndex[] docValuesIdx) {
-    int[] start = new int[docValuesIdx.length];
+    final int[] start = new int[docValuesIdx.length];
+    TypePromoter promoter = TypePromoter.getIdentityPromoter();
     for (int i = 0; i < docValuesIdx.length; i++) {
       start[i] = docValuesIdx[i].start;
+      if (!(docValuesIdx[i].docValues instanceof DummyDocValues)) {
+        // only promote if not a dummy
+        final TypePromoter incomingPromoter = TypePromoter.create(
+            docValuesIdx[i].docValues.type(),
+            docValuesIdx[i].docValues.getValueSize());
+        promoter = promoter.promote(incomingPromoter);
+        if (promoter == null) {
+          throw new IllegalStateException("Can not promote " + incomingPromoter);
+        }
+      }
     }
+    this.type = promoter.type();
+    this.valueSize = promoter.getValueSize();
     this.starts = start;
     this.docValuesIdx = docValuesIdx;
     return this;
@@ -180,7 +195,12 @@ public class MultiIndexDocValues extends IndexDocValues {
 
   @Override
   public ValueType type() {
-    return this.docValuesIdx[0].docValues.type();
+    return type;
+  }
+
+  @Override
+  public int getValueSize() {
+    return valueSize;
   }
 
   @Override
diff --git a/lucene/src/java/org/apache/lucene/index/values/TypePromoter.java b/lucene/src/java/org/apache/lucene/index/values/TypePromoter.java
new file mode 100644
index 0000000..ac8544e
--- /dev/null
+++ b/lucene/src/java/org/apache/lucene/index/values/TypePromoter.java
@@ -0,0 +1,204 @@
+package org.apache.lucene.index.values;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Type promoter that promotes {@link IndexDocValues} during merge based on
+ * their {@link ValueType} and {@link #getValueSize()}
+ * 
+ * @lucene.internal
+ */
+public class TypePromoter {
+
+  private final static Map<Integer, ValueType> FLAGS_MAP = new HashMap<Integer, ValueType>();
+  private static final TypePromoter IDENTITY_PROMOTER = new IdentityTypePromoter();
+  public static final int VAR_TYPE_VALUE_SIZE = -1;
+
+  private static final int IS_INT = 1 << 0;
+  private static final int IS_BYTE = 1 << 1;
+  private static final int IS_FLOAT = 1 << 2;
+  /* VAR & FIXED == VAR */
+  private static final int IS_VAR = 1 << 3;
+  private static final int IS_FIXED = 1 << 3 | 1 << 4;
+  /* if we have FIXED & FIXED with different size we promote to VAR */
+  private static final int PROMOTE_TO_VAR_SIZE_MASK = ~(1 << 3);
+  /* STRAIGHT & DEREF == STRAIGHT (dense values win) */
+  private static final int IS_STRAIGHT = 1 << 5;
+  private static final int IS_DEREF = 1 << 5 | 1 << 6;
+  private static final int IS_SORTED = 1 << 7;
+  /* more bits wins (int16 & int32 == int32) */
+  private static final int IS_8_BIT = 1 << 8 | 1 << 9 | 1 << 10 | 1 << 11;
+  private static final int IS_16_BIT = 1 << 9 | 1 << 10 | 1 << 11;
+  private static final int IS_32_BIT = 1 << 10 | 1 << 11;
+  private static final int IS_64_BIT = 1 << 11;
+
+  private final ValueType type;
+  private final int flags;
+  private final int valueSize;
+
+  /**
+   * Returns a positive value size if this {@link TypePromoter} represents a
+   * fixed variant, otherwise <code>-1</code>
+   * 
+   * @return a positive value size if this {@link TypePromoter} represents a
+   *         fixed variant, otherwise <code>-1</code>
+   */
+  public int getValueSize() {
+    return valueSize;
+  }
+
+  static {
+    for (ValueType type : ValueType.values()) {
+      TypePromoter create = create(type, VAR_TYPE_VALUE_SIZE);
+      FLAGS_MAP.put(create.flags, type);
+    }
+  }
+
+  /**
+   * Creates a new {@link TypePromoter}
+   * 
+   * @param type
+   *          the {@link ValueType} this promoter represents
+   * @param flags
+   *          the promoters flags
+   * @param valueSize
+   *          the value size if {@link #IS_FIXED} or <code>-1</code> otherwise.
+   */
+  protected TypePromoter(ValueType type, int flags, int valueSize) {
+    this.type = type;
+    this.flags = flags;
+    this.valueSize = valueSize;
+  }
+
+  /**
+   * Creates a new promoted {@link TypePromoter} based on this and the given
+   * {@link TypePromoter} or <code>null</code> iff the {@link TypePromoter} 
+   * aren't compatible.
+   * 
+   * @param promoter
+   *          the incoming promoter
+   * @return a new promoted {@link TypePromoter} based on this and the given
+   *         {@link TypePromoter} or <code>null</code> iff the
+   *         {@link TypePromoter} aren't compatible.
+   */
+  public TypePromoter promote(TypePromoter promoter) {
+
+    int promotedFlags = promoter.flags & this.flags;
+    TypePromoter promoted = create(FLAGS_MAP.get(promotedFlags), valueSize);
+    if (promoted == null) {
+      return promoted;
+    }
+    if ((promoted.flags & IS_BYTE) != 0 && (promoted.flags & IS_FIXED) == IS_FIXED) {
+      if (this.valueSize == promoter.valueSize) {
+        return promoted;
+      }
+      return create(FLAGS_MAP.get(promoted.flags & PROMOTE_TO_VAR_SIZE_MASK),
+          VAR_TYPE_VALUE_SIZE);
+    }
+    return promoted;
+
+  }
+
+  /**
+   * Returns the {@link ValueType} of this {@link TypePromoter}
+   * 
+   * @return the {@link ValueType} of this {@link TypePromoter}
+   */
+  public ValueType type() {
+    return type;
+  }
+
+  @Override
+  public String toString() {
+    return "TypePromoter [type=" + type + ", sizeInBytes=" + valueSize + "]";
+  }
+
+  /**
+   * Creates a new {@link TypePromoter} for the given type and size per value.
+   * 
+   * @param type
+   *          the {@link ValueType} to create the promoter for
+   * @param valueSize
+   *          the size per value in bytes or <code>-1</code> iff the types have
+   *          variable length.
+   * @return a new {@link TypePromoter}
+   */
+  public static TypePromoter create(ValueType type, int valueSize) {
+    if (type == null) {
+      return null;
+    }
+    switch (type) {
+    case BYTES_FIXED_DEREF:
+      return new TypePromoter(type, IS_BYTE | IS_FIXED | IS_DEREF, valueSize);
+    case BYTES_FIXED_SORTED:
+      return new TypePromoter(type, IS_BYTE | IS_FIXED | IS_SORTED, valueSize);
+    case BYTES_FIXED_STRAIGHT:
+      return new TypePromoter(type, IS_BYTE | IS_FIXED | IS_STRAIGHT, valueSize);
+    case BYTES_VAR_DEREF:
+      return new TypePromoter(type, IS_BYTE | IS_VAR | IS_DEREF, VAR_TYPE_VALUE_SIZE);
+    case BYTES_VAR_SORTED:
+      return new TypePromoter(type, IS_BYTE | IS_VAR | IS_SORTED, VAR_TYPE_VALUE_SIZE);
+    case BYTES_VAR_STRAIGHT:
+      return new TypePromoter(type, IS_BYTE | IS_VAR | IS_STRAIGHT, VAR_TYPE_VALUE_SIZE);
+    case FIXED_INTS_16:
+      return new TypePromoter(type,
+          IS_INT | IS_FIXED | IS_STRAIGHT | IS_16_BIT, valueSize);
+    case FIXED_INTS_32:
+      return new TypePromoter(type,
+          IS_INT | IS_FIXED | IS_STRAIGHT | IS_32_BIT, valueSize);
+    case FIXED_INTS_64:
+      return new TypePromoter(type,
+          IS_INT | IS_FIXED | IS_STRAIGHT | IS_64_BIT, valueSize);
+    case FIXED_INTS_8:
+      return new TypePromoter(type, IS_INT | IS_FIXED | IS_STRAIGHT | IS_8_BIT,
+          valueSize);
+    case FLOAT_32:
+      return new TypePromoter(type, IS_FLOAT | IS_FIXED | IS_STRAIGHT
+          | IS_32_BIT, valueSize);
+    case FLOAT_64:
+      return new TypePromoter(type, IS_FLOAT | IS_FIXED | IS_STRAIGHT
+          | IS_64_BIT, valueSize);
+    case VAR_INTS:
+      return new TypePromoter(type, IS_INT | IS_VAR | IS_STRAIGHT, VAR_TYPE_VALUE_SIZE);
+    default:
+      throw new IllegalStateException();
+    }
+  }
+
+  /**
+   * Returns a {@link TypePromoter} that always promotes to the type provided to
+   * {@link #promote(TypePromoter)}
+   */
+  public static TypePromoter getIdentityPromoter() {
+    return IDENTITY_PROMOTER;
+  }
+
+  private static class IdentityTypePromoter extends TypePromoter {
+
+    public IdentityTypePromoter() {
+      super(null, 0, -1);
+    }
+
+    @Override
+    public TypePromoter promote(TypePromoter promoter) {
+      return promoter;
+    }
+  }
+}
\ No newline at end of file
diff --git a/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java b/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
index f67009e..ba28142 100644
--- a/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
+++ b/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
@@ -93,7 +93,7 @@ class VarStraightBytesImpl {
     }
     
     @Override
-    protected void merge(MergeState state) throws IOException {
+    protected void merge(SingleSubMergeState state) throws IOException {
       merge = true;
       datOut = getOrCreateDataOut();
       boolean success = false;
diff --git a/lucene/src/java/org/apache/lucene/index/values/Writer.java b/lucene/src/java/org/apache/lucene/index/values/Writer.java
index 09eb2eb..000486a 100644
--- a/lucene/src/java/org/apache/lucene/index/values/Writer.java
+++ b/lucene/src/java/org/apache/lucene/index/values/Writer.java
@@ -138,7 +138,7 @@ public abstract class Writer extends DocValuesConsumer {
   public abstract void finish(int docCount) throws IOException;
 
   @Override
-  protected void merge(MergeState state) throws IOException {
+  protected void merge(SingleSubMergeState state) throws IOException {
     // This enables bulk copies in subclasses per MergeState, subclasses can
     // simply override this and decide if they want to merge
     // segments using this generic implementation or if a bulk merge is possible
diff --git a/lucene/src/test/org/apache/lucene/index/values/TestTypePromotion.java b/lucene/src/test/org/apache/lucene/index/values/TestTypePromotion.java
new file mode 100644
index 0000000..8b4c353
--- /dev/null
+++ b/lucene/src/test/org/apache/lucene/index/values/TestTypePromotion.java
@@ -0,0 +1,313 @@
+package org.apache.lucene.index.values;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.IndexDocValuesField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReader.ReaderContext;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.codecs.CodecProvider;
+import org.apache.lucene.index.values.IndexDocValues.Source;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.junit.Before;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class TestTypePromotion extends LuceneTestCase {
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    assumeFalse("cannot work with preflex codec", CodecProvider.getDefault()
+        .getDefaultFieldCodec().equals("PreFlex"));
+  }
+
+  private static EnumSet<ValueType> INTEGERS = EnumSet.of(ValueType.VAR_INTS,
+      ValueType.FIXED_INTS_16, ValueType.FIXED_INTS_32,
+      ValueType.FIXED_INTS_64, ValueType.FIXED_INTS_8);
+
+  private static EnumSet<ValueType> FLOATS = EnumSet.of(ValueType.FLOAT_32,
+      ValueType.FLOAT_64);
+
+  private static EnumSet<ValueType> UNSORTED_BYTES = EnumSet.of(
+      ValueType.BYTES_FIXED_DEREF, ValueType.BYTES_FIXED_STRAIGHT,
+      ValueType.BYTES_VAR_STRAIGHT, ValueType.BYTES_VAR_DEREF);
+
+  private static EnumSet<ValueType> SORTED_BYTES = EnumSet.of(
+      ValueType.BYTES_FIXED_SORTED, ValueType.BYTES_VAR_SORTED);
+  
+  public ValueType randomValueType(EnumSet<ValueType> typeEnum, Random random) {
+    ValueType[] array = typeEnum.toArray(new ValueType[0]);
+    return array[random.nextInt(array.length)];
+  }
+  
+  private static enum TestType {
+    Int, Float, Byte
+  }
+
+  private void runTest(EnumSet<ValueType> types, TestType type)
+      throws CorruptIndexException, IOException {
+    Directory dir = newDirectory();
+    IndexWriter writer = new IndexWriter(dir,
+        newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+    int num_1 = atLeast(200);
+    int num_2 = atLeast(200);
+    int num_3 = atLeast(200);
+    long[] values = new long[num_1 + num_2 + num_3];
+    index(writer, new IndexDocValuesField("promote"),
+        randomValueType(types, random), values, 0, num_1);
+    writer.commit();
+    
+    index(writer, new IndexDocValuesField("promote"),
+        randomValueType(types, random), values, num_1, num_2);
+    writer.commit();
+    
+    if (random.nextInt(4) == 0) {
+      // once in a while use addIndexes
+      writer.optimize();
+      
+      Directory dir_2 = newDirectory() ;
+      IndexWriter writer_2 = new IndexWriter(dir_2,
+          newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+      index(writer_2, new IndexDocValuesField("promote"),
+          randomValueType(types, random), values, num_1 + num_2, num_3);
+      writer_2.commit();
+      writer_2.close();
+      if (random.nextBoolean()) {
+        writer.addIndexes(dir_2);
+      } else {
+        // do a real merge here
+        IndexReader open = IndexReader.open(dir_2);
+        writer.addIndexes(open);
+        open.close();
+      }
+      dir_2.close();
+    } else {
+      index(writer, new IndexDocValuesField("promote"),
+          randomValueType(types, random), values, num_1 + num_2, num_3);
+    }
+
+    writer.optimize();
+    writer.close();
+    assertValues(type, dir, values);
+    dir.close();
+  }
+
+  private void assertValues(TestType type, Directory dir, long[] values)
+      throws CorruptIndexException, IOException {
+    IndexReader reader = IndexReader.open(dir);
+    assertTrue(reader.isOptimized());
+    ReaderContext topReaderContext = reader.getTopReaderContext();
+    ReaderContext[] children = topReaderContext.children();
+    IndexDocValues docValues = children[0].reader.docValues("promote");
+    assertEquals(1, children.length);
+    Source directSource = docValues.getDirectSource();
+    for (int i = 0; i < values.length; i++) {
+      int id = Integer.parseInt(reader.document(i).get("id"));
+      String msg = "id: " + id + " doc: " + i;
+      switch (type) {
+      case Byte:
+        BytesRef bytes = directSource.getBytes(i, new BytesRef());
+        long value = 0;
+        switch(bytes.length) {
+        case 1:
+          value = bytes.bytes[bytes.offset];
+          break;
+        case 2:
+          value = bytes.asShort();
+          break;
+        case 4:
+          value = bytes.asInt();
+          break;
+        case 8:
+          value = bytes.asLong();
+          break;
+          
+        default:
+          fail(msg + " bytessize: " + bytes.length);
+        }
+        
+        assertEquals(msg  + " byteSize: " + bytes.length, values[id], value);
+        break;
+      case Float:
+          assertEquals(msg, values[id], Double.doubleToRawLongBits(directSource.getFloat(i)));
+        break;
+      case Int:
+        assertEquals(msg, values[id], directSource.getInt(i));
+      default:
+        break;
+      }
+
+    }
+    docValues.close();
+    reader.close();
+  }
+
+  public void index(IndexWriter writer, IndexDocValuesField valField,
+      ValueType valueType, long[] values, int offset, int num)
+      throws CorruptIndexException, IOException {
+    BytesRef ref = new BytesRef(new byte[] { 1, 2, 3, 4 });
+    for (int i = offset; i < offset + num; i++) {
+      Document doc = new Document();
+      doc.add(new Field("id", i + "", TextField.TYPE_STORED));
+      switch (valueType) {
+      case VAR_INTS:
+        values[i] = random.nextInt();
+        valField.setInt(values[i]);
+        break;
+      case FIXED_INTS_16:
+        values[i] = random.nextInt(Short.MAX_VALUE);
+        valField.setInt((short) values[i], true);
+        break;
+      case FIXED_INTS_32:
+        values[i] = random.nextInt();
+        valField.setInt((int) values[i], true);
+        break;
+      case FIXED_INTS_64:
+        values[i] = random.nextLong();
+        valField.setInt(values[i], true);
+        break;
+      case FLOAT_64:
+        double nextDouble = random.nextDouble();
+        values[i] = Double.doubleToRawLongBits(nextDouble);
+        valField.setFloat(nextDouble);
+        break;
+      case FLOAT_32:
+        final float nextFloat = random.nextFloat();
+        values[i] = Double.doubleToRawLongBits(nextFloat);
+        valField.setFloat(nextFloat);
+        break;
+      case FIXED_INTS_8:
+         values[i] = (byte) i;
+        valField.setInt((byte)values[i], true);
+        break;
+      case BYTES_FIXED_DEREF:
+      case BYTES_FIXED_SORTED:
+      case BYTES_FIXED_STRAIGHT:
+        values[i] = random.nextLong();
+        ref.copy(values[i]);
+        valField.setBytes(ref, valueType);
+        break;
+      case BYTES_VAR_DEREF:
+      case BYTES_VAR_SORTED:
+      case BYTES_VAR_STRAIGHT:
+        if (random.nextBoolean()) {
+          ref.copy(random.nextInt());
+          values[i] = ref.asInt();
+        } else {
+          ref.copy(random.nextLong());
+          values[i] = ref.asLong();
+        }
+        valField.setBytes(ref, valueType);
+        break;
+
+      default:
+        fail("unexpected value " + valueType);
+
+      }
+      doc.add(valField);
+      writer.addDocument(doc);
+      if (random.nextInt(10) == 0) {
+        writer.commit();
+      }
+    }
+  }
+
+  public void testPromoteBytes() throws IOException {
+    runTest(UNSORTED_BYTES, TestType.Byte);
+  }
+  
+  public void testSortedPromoteBytes() throws IOException {
+    runTest(SORTED_BYTES, TestType.Byte);
+  }
+
+  public void testPromotInteger() throws IOException {
+    runTest(INTEGERS, TestType.Int);
+  }
+
+  public void testPromotFloatingPoint() throws CorruptIndexException,
+      IOException {
+    runTest(FLOATS, TestType.Float);
+  }
+  
+  public void testMergeIncompatibleTypes() throws IOException {
+    Directory dir = newDirectory();
+    IndexWriterConfig writerConfig = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
+    writerConfig.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES); // no merges until we are done with adding values
+    IndexWriter writer = new IndexWriter(dir, writerConfig);
+    int num_1 = atLeast(200);
+    int num_2 = atLeast(200);
+    long[] values = new long[num_1 + num_2];
+    index(writer, new IndexDocValuesField("promote"),
+        randomValueType(INTEGERS, random), values, 0, num_1);
+    writer.commit();
+    
+    if (random.nextInt(4) == 0) {
+      // once in a while use addIndexes
+      Directory dir_2 = newDirectory() ;
+      IndexWriter writer_2 = new IndexWriter(dir_2,
+          newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+      index(writer_2, new IndexDocValuesField("promote"),
+          randomValueType(random.nextBoolean() ? UNSORTED_BYTES : SORTED_BYTES, random), values, num_1, num_2);
+      writer_2.commit();
+      writer_2.close();
+      if (random.nextBoolean()) {
+        writer.addIndexes(dir_2);
+      } else {
+        // do a real merge here
+        IndexReader open = IndexReader.open(dir_2);
+        writer.addIndexes(open);
+        open.close();
+      }
+      dir_2.close();
+    } else {
+      index(writer, new IndexDocValuesField("promote"),
+          randomValueType(random.nextBoolean() ? UNSORTED_BYTES : SORTED_BYTES, random), values, num_1, num_2);
+      writer.commit();
+    }
+    writer.close();
+    writerConfig = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
+    if (writerConfig.getMergePolicy() instanceof NoMergePolicy) {
+      writerConfig.setMergePolicy(newLogMergePolicy()); // make sure we optimize to one segment (merge everything together)
+    }
+    writer = new IndexWriter(dir, writerConfig);
+    // now optimize
+    writer.optimize();
+    writer.close();
+    IndexReader reader = IndexReader.open(dir);
+    assertTrue(reader.isOptimized());
+    ReaderContext topReaderContext = reader.getTopReaderContext();
+    ReaderContext[] children = topReaderContext.children();
+    IndexDocValues docValues = children[0].reader.docValues("promote");
+    assertNotNull(docValues);
+    assertValues(TestType.Byte, dir, values);
+    assertEquals(ValueType.BYTES_VAR_STRAIGHT, docValues.type());
+    reader.close();
+    dir.close();
+  }
+
+}
