Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1182640) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -32,6 +32,8 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,14 +49,14 @@ import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.client.MultiPutResponse; -import org.apache.hadoop.hbase.client.MultiAction; -import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -83,8 +85,8 @@ import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.util.Bytes; @@ -93,6 +95,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableUtils; /** * This is a customized version of the polymorphic hadoop @@ -115,15 +118,15 @@ // Here we maintain two static maps of classes to code and vice versa. // Add new classes+codes as wanted or figure way to auto-generate these // maps from the HMasterInterface. - static final Map> CODE_TO_CLASS = - new HashMap>(); - static final Map, Byte> CLASS_TO_CODE = - new HashMap, Byte>(); + static final Map> CODE_TO_CLASS = + new HashMap>(); + static final Map, Integer> CLASS_TO_CODE = + new HashMap, Integer>(); // Special code that means 'not-encoded'; in this case we do old school // sending of the class name using reflection, etc. private static final byte NOT_ENCODED = 0; static { - byte code = NOT_ENCODED + 1; + int code = NOT_ENCODED + 1; // Primitive types. addToMap(Boolean.TYPE, code++); addToMap(Byte.TYPE, code++); @@ -241,6 +244,7 @@ addToMap(Append.class, code++); + addToMap(Queue.class, code++); } private Class declaredClass; @@ -320,7 +324,7 @@ } public void readFields(DataInput in) throws IOException { - this.declaredClass = CODE_TO_CLASS.get(in.readByte()); + this.declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in)); } public void write(DataOutput out) throws IOException { @@ -336,7 +340,7 @@ */ static void writeClassCode(final DataOutput out, final Class c) throws IOException { - Byte code = CLASS_TO_CODE.get(c); + Integer code = CLASS_TO_CODE.get(c); if (code == null ) { if ( List.class.isAssignableFrom(c)) { code = CLASS_TO_CODE.get(List.class); @@ -347,6 +351,9 @@ else if (Serializable.class.isAssignableFrom(c)){ code = CLASS_TO_CODE.get(Serializable.class); } + else if (Queue.class.isAssignableFrom(c)) { + code = CLASS_TO_CODE.get(Queue.class); + } } if (code == null) { LOG.error("Unsupported type " + c); @@ -354,11 +361,9 @@ for(StackTraceElement elem : els) { LOG.error(elem.getMethodName()); } -// new Exception().getStackTrace()[0].getMethodName()); -// throw new IOException(new Exception().getStackTrace()[0].getMethodName()); throw new UnsupportedOperationException("No code for unexpected " + c); } - out.writeByte(code); + WritableUtils.writeVInt(out, code); } @@ -428,6 +433,15 @@ writeObject(out, list.get(i), list.get(i).getClass(), conf); } + } else if (Queue.class.isAssignableFrom(declClass)) { + Queue queue = (Queue)instanceObj; + int length = queue.size(); + out.writeInt(length); + Object obj; + for (int i = 0; i < length; i++) { + obj = queue.remove(); + writeObject(out, obj, obj.getClass(), conf); + } } else if (declClass == String.class) { // String Text.writeString(out, (String)instanceObj); } else if (declClass.isPrimitive()) { // primitive type @@ -455,7 +469,7 @@ Text.writeString(out, ((Enum)instanceObj).name()); } else if (Writable.class.isAssignableFrom(declClass)) { // Writable Class c = instanceObj.getClass(); - Byte code = CLASS_TO_CODE.get(c); + Integer code = CLASS_TO_CODE.get(c); if (code == null) { out.writeByte(NOT_ENCODED); Text.writeString(out, c.getName()); @@ -465,7 +479,7 @@ ((Writable)instanceObj).write(out); } else if (Serializable.class.isAssignableFrom(declClass)) { Class c = instanceObj.getClass(); - Byte code = CLASS_TO_CODE.get(c); + Integer code = CLASS_TO_CODE.get(c); if (code == null) { out.writeByte(NOT_ENCODED); Text.writeString(out, c.getName()); @@ -517,7 +531,7 @@ public static Object readObject(DataInput in, HbaseObjectWritable objectWritable, Configuration conf) throws IOException { - Class declaredClass = CODE_TO_CLASS.get(in.readByte()); + Class declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in)); Object instance; if (declaredClass.isPrimitive()) { // primitive types if (declaredClass == Boolean.TYPE) { // boolean @@ -553,12 +567,18 @@ Array.set(instance, i, readObject(in, conf)); } } - } else if (List.class.isAssignableFrom(declaredClass)) { // List + } else if (List.class.isAssignableFrom(declaredClass)) { // List int length = in.readInt(); instance = new ArrayList(length); for (int i = 0; i < length; i++) { ((ArrayList)instance).add(readObject(in, conf)); } + } else if (Queue.class.isAssignableFrom(declaredClass)) { + int length = in.readInt(); + instance = new ConcurrentLinkedQueue(); + for (int i = 0; i < length; i++) { + ((Queue)instance).add(readObject(in, conf)); + } } else if (declaredClass == String.class) { // String instance = Text.readString(in); } else if (declaredClass.isEnum()) { // enum @@ -566,8 +586,8 @@ Text.readString(in)); } else { // Writable or Serializable Class instanceClass = null; - Byte b = in.readByte(); - if (b.byteValue() == NOT_ENCODED) { + int b = (byte)WritableUtils.readVInt(in); + if (b == NOT_ENCODED) { String className = Text.readString(in); try { instanceClass = getClassByName(conf, className); @@ -630,7 +650,7 @@ return Class.forName(className, true, cl); } - private static void addToMap(final Class clazz, final byte code) { + private static void addToMap(final Class clazz, final int code) { CLASS_TO_CODE.put(clazz, code); CODE_TO_CLASS.put(code, clazz); }