Index: src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java =================================================================== --- src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (revision 1242037) +++ src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (working copy) @@ -126,7 +126,7 @@ // serialize using another serde, and read out that object repr. LazySimpleSerDe testSD = new LazySimpleSerDe(); testSD.initialize(conf, tblProps); - + Writable s3 = testSD.serialize(s, hrsd.getObjectInspector()); System.out.println("THREE:"+s3.toString()); Object o3 = testSD.deserialize(s3); @@ -135,8 +135,17 @@ // then serialize again using hrsd, and compare results HCatRecord s4 = (HCatRecord) hrsd.serialize(o3, testSD.getObjectInspector()); System.out.println("FOUR:"+s4.toString()); - Assert.assertFalse(r.equals(s4)); - + + // Test LazyHCatRecord init and read + LazyHCatRecord s5 = new LazyHCatRecord(o3,testSD.getObjectInspector()); + System.out.println("FIVE:"+s5.toString()); + + LazyHCatRecord s6 = new LazyHCatRecord(s4,hrsd.getObjectInspector()); + System.out.println("SIX:"+s6.toString()); + + LazyHCatRecord s7 = new LazyHCatRecord(r.getAll()); + System.out.println("SEVEN:"+s7.toString()); + Assert.assertTrue(s7.equals(r)); } } Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1242037) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; @@ -46,6 +47,8 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; @@ -57,6 +60,9 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.data.DataType; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.HCatRecordSerDe; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; @@ -67,7 +73,7 @@ public class HCatUtil { - // static final private Log LOG = LogFactory.getLog(HCatUtil.class); +// static final private Log LOG = LogFactory.getLog(HCatUtil.class); public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("")) { @@ -391,7 +397,7 @@ public static void logStackTrace(Log logger) { StackTraceElement[] stackTrace = new Exception().getStackTrace(); for (int i = 1; i < stackTrace.length; i++) { - logger.info("\t" + stackTrace[i].toString()); + logger.debug("\t" + stackTrace[i].toString()); } } @@ -408,9 +414,9 @@ public static void logList(Log logger, String itemName, List list) { - logger.info(itemName + ":"); + logger.debug(itemName + ":"); for (Object item : list) { - logger.info("\t[" + item + "]"); + logger.debug("\t[" + item + "]"); } } @@ -478,4 +484,47 @@ +". or
. Got " + tableName); } } + + public static int compareRecords(HCatRecord first, HCatRecord second) { + try { + return compareRecordContents(first.getAll(), second.getAll()); + + } catch (HCatException e) { + // uh-oh - we've hit a deserialization error in all likelihood + // we're likely not going to recover from this. Alright, the best + // we can do is throw a ClassCastException (the only thing throwable + // from a compareTo, and in a sense, inability to read the object + // is what got us here. + throw new ClassCastException(e.getMessage()); + } + } + + public static int compareRecordContents(List first, List second) { + int mySz = first.size(); + int urSz = second.size(); + if(mySz != urSz) { + return mySz - urSz; + } else { + for (int i = 0; i < first.size(); i++) { + int c = DataType.compare(first.get(i), second.get(i)); + if (c != 0) { + return c; + } + } + return 0; + } + } + + public static ObjectInspector getObjectInspector(String serdeClassName, + Configuration conf, Properties tbl) throws Exception { + SerDe s = (SerDe) Class.forName(serdeClassName).newInstance(); + s.initialize(conf, tbl); + return s.getObjectInspector(); + } + + public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{ + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(hsch); + return hrsd.getObjectInspector(); + } } Index: src/java/org/apache/hcatalog/data/HCatRecordSerDe.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordSerDe.java (revision 1242037) +++ src/java/org/apache/hcatalog/data/HCatRecordSerDe.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; /** * SerDe class for serializing to and from HCatRecord @@ -108,7 +109,19 @@ cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); } + + public void initialize(HCatSchema hsch) throws SerDeException { + if (LOG.isDebugEnabled()){ + LOG.debug("Initializing HCatRecordSerDe through HCatSchema" + hsch.toString()); + } + + rowTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hsch.toString()); + cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); + + } + + /** * The purpose of a deserialize method is to turn a data blob * which is a writable representation of the data into an @@ -156,7 +169,7 @@ * @param soi : StructObjectInspector * @return HCatRecord */ - private List serializeStruct(Object obj, StructObjectInspector soi) + private static List serializeStruct(Object obj, StructObjectInspector soi) throws SerDeException { List fields = soi.getAllStructFieldRefs(); @@ -181,7 +194,7 @@ * Return underlying Java Object from an object-representation * that is readable by a provided ObjectInspector. */ - private Object serializeField(Object field, + public static Object serializeField(Object field, ObjectInspector fieldObjectInspector) throws SerDeException { Object res = null; if (fieldObjectInspector.getCategory() == Category.PRIMITIVE){ @@ -193,7 +206,7 @@ } else if (fieldObjectInspector.getCategory() == Category.MAP){ res = serializeMap(field,(MapObjectInspector)fieldObjectInspector); } else { - throw new SerDeException(getClass().toString() + throw new SerDeException(HCatRecordSerDe.class.toString() + " does not know what to do with fields of unknown category: " + fieldObjectInspector.getCategory() + " , type: " + fieldObjectInspector.getTypeName()); } @@ -205,7 +218,7 @@ * an object-representation that is readable by a provided * MapObjectInspector */ - private Map serializeMap(Object f, MapObjectInspector moi) throws SerDeException { + private static Map serializeMap(Object f, MapObjectInspector moi) throws SerDeException { ObjectInspector koi = moi.getMapKeyObjectInspector(); ObjectInspector voi = moi.getMapValueObjectInspector(); Map m = new TreeMap(); @@ -221,7 +234,7 @@ return m; } - private List serializeList(Object f, ListObjectInspector loi) throws SerDeException { + private static List serializeList(Object f, ListObjectInspector loi) throws SerDeException { List l = loi.getList(f); ObjectInspector eloi = loi.getListElementObjectInspector(); if (eloi.getCategory() == Category.PRIMITIVE){ @@ -244,7 +257,7 @@ } throw new SerDeException("HCatSerDe map type unimplemented"); } else { - throw new SerDeException(getClass().toString() + throw new SerDeException(HCatRecordSerDe.class.toString() + " does not know what to do with fields of unknown category: " + eloi.getCategory() + " , type: " + eloi.getTypeName()); } @@ -274,4 +287,5 @@ return null; } + } Index: src/java/org/apache/hcatalog/data/HCatRecordable.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordable.java (revision 1242037) +++ src/java/org/apache/hcatalog/data/HCatRecordable.java (working copy) @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.io.WritableComparable; +import org.apache.hcatalog.common.HCatException; /** * Interface that determines whether we can implement a HCatRecord on top of it @@ -30,21 +31,22 @@ * Gets the field at the specified index. * @param fieldNum the field number * @return the object at the specified index + * @throws HCatException */ - Object get(int fieldNum); + Object get(int fieldNum) throws HCatException; /** * Gets all the fields of the hcat record. * @return the list of fields */ - List getAll(); + List getAll() throws HCatException; /** * Sets the field at the specified index. * @param fieldNum the field number * @param value the value to set */ - void set(int fieldNum, Object value); + void set(int fieldNum, Object value) throws HCatException; /** * Gets the size of the hcat record. Index: src/java/org/apache/hcatalog/data/DefaultHCatRecord.java =================================================================== --- src/java/org/apache/hcatalog/data/DefaultHCatRecord.java (revision 1242037) +++ src/java/org/apache/hcatalog/data/DefaultHCatRecord.java (working copy) @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; public class DefaultHCatRecord extends HCatRecord { @@ -92,34 +93,6 @@ } @Override - public int compareTo(Object that) { - - if(that instanceof HCatRecord) { - HCatRecord other = (HCatRecord)that; - int mySz = this.size(); - int urSz = other.size(); - if(mySz != urSz) { - return mySz - urSz; - } else{ - for (int i = 0; i < mySz;i++) { - int c = DataType.compare(get(i), other.get(i)); - if (c != 0) { - return c; - } - } - } - return 0; - } else { - return DataType.compare(this, that); - } - } - - @Override - public boolean equals(Object other) { - return (compareTo(other) == 0); - } - - @Override public int hashCode() { int hash = 1; for (Object o : contents) { Index: src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java (revision 1242037) +++ src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java (working copy) @@ -19,12 +19,19 @@ import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; public class HCatRecordObjectInspector extends StandardStructObjectInspector { + public static final Log LOG = LogFactory + .getLog(HCatRecordObjectInspector.class.getName()); + protected HCatRecordObjectInspector(List structFieldNames, List structFieldObjectInspectors) { super(structFieldNames, structFieldObjectInspectors); @@ -39,12 +46,24 @@ int fieldID = ((MyField) fieldRef).getFieldID(); assert (fieldID >= 0 && fieldID < fields.size()); - return ((HCatRecord) data).get(fieldID); + try { + return ((HCatRecord) data).get(fieldID); + } catch (HCatException e) { + LOG.debug(e.getMessage()); + HCatUtil.logStackTrace(LOG); + return null; + } } @Override public List getStructFieldsDataAsList(Object o) { - return ((HCatRecord) o).getAll(); + try { + return ((HCatRecord) o).getAll(); + } catch (HCatException e) { + LOG.debug(e.getMessage()); + HCatUtil.logStackTrace(LOG); + return null; + } } } Index: src/java/org/apache/hcatalog/data/HCatRecord.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecord.java (revision 1242037) +++ src/java/org/apache/hcatalog/data/HCatRecord.java (working copy) @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; /** @@ -134,4 +135,18 @@ set(fieldName,recordSchema,value); } + @Override + public int compareTo(Object that) { + if(that instanceof HCatRecord) { + return HCatUtil.compareRecords(this,(HCatRecord)that); + } else { + return DataType.compare(this, that); + } + } + + @Override + public boolean equals(Object other) { + return (compareTo(other) == 0); + } + } Index: src/java/org/apache/hcatalog/data/LazyHCatRecord.java =================================================================== --- src/java/org/apache/hcatalog/data/LazyHCatRecord.java (revision 0) +++ src/java/org/apache/hcatalog/data/LazyHCatRecord.java (revision 0) @@ -0,0 +1,202 @@ +package org.apache.hcatalog.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; + +public class LazyHCatRecord extends HCatRecord { + + public static final Log LOG = LogFactory + .getLog(LazyHCatRecord.class.getName()); + + private List isDeserialized; + private boolean allDeserialized; + + private List contents; + private List fieldObjectInspectors; + + + @Override + public Object get(int fieldNum) throws HCatException{ + try { + deserialize(fieldNum); + } catch (Exception e) { + throw new HCatException("SerDe Exception deserializing",e); + } + return contents.get(fieldNum); + } + + public void deserialize(int fieldNum) throws Exception { + if ((!allDeserialized) && (! isDeserialized.get(fieldNum))){ + Object o = contents.get(fieldNum); + contents.set(fieldNum,deserialize(o,fieldObjectInspectors.get(fieldNum))); + isDeserialized.set(fieldNum, true); + } + } + + private Object deserialize(Object o, ObjectInspector objectInspector) throws Exception { + return HCatRecordSerDe.serializeField(o, objectInspector); + } + + @Override + public List getAll() throws HCatException { + try { + deserializeAll(); + } catch (Exception e) { + throw new HCatException("SerDe Exception deserializing",e); + } + return contents; + } + + private void deserializeAll() throws Exception { + if (!allDeserialized){ + for (int i = 0; i < fieldObjectInspectors.size(); i++){ + deserialize(i); + } + allDeserialized = true; + } + } + + @Override + public void set(int fieldNum, Object value) { + contents.set(fieldNum,value); + isDeserialized.set(fieldNum,true); + } + + @Override + public int size() { + return contents.size(); + } + + @Override + public void readFields(DataInput in) throws IOException { + + contents.clear(); + int len = in.readInt(); + for(int i = 0; i < len; i++){ + contents.add(ReaderWriter.readDatum(in)); + } + + this.isDeserialized = new ArrayList(this.contents.size()); + this.setAllDeserialized(true); + + } + + @Override + public void write(DataOutput out) throws IOException { + try { + deserializeAll(); + } catch (Exception e) { + throw new IOException(e); + } + int sz = size(); + out.writeInt(sz); + for (int i = 0; i < sz; i++) { + ReaderWriter.writeDatum(out, contents.get(i)); + } + + } + + @Override + public Object get(String fieldName, HCatSchema recordSchema) + throws HCatException { + int idx = recordSchema.getPosition(fieldName); + return get(idx); + } + + @Override + public void set(String fieldName, HCatSchema recordSchema, Object value) + throws HCatException { + int idx = recordSchema.getPosition(fieldName); + set(idx,value); + isDeserialized.set(idx,true); + } + + @Override + public void remove(int idx) throws HCatException { + // playing with fire here, potential for errors in usage high. + contents.remove(idx); + isDeserialized.remove(idx); + fieldObjectInspectors.remove(idx); + } + + @Override + public void copy(HCatRecord r) throws HCatException { + this.contents = r.getAll(); + setAllDeserialized(true); + } + + private void setAllDeserialized(boolean state) { + allDeserialized = state; + } + + public LazyHCatRecord(Object o, ObjectInspector oi) throws Exception{ + + if (oi.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only make a lazy hcat record from objects of struct types, but we got: " + + oi.getTypeName()); + } + + List fields = ((StructObjectInspector)oi).getAllStructFieldRefs(); + this.fieldObjectInspectors = new ArrayList(); + + if (fields != null){ + for (int i = 0; i < fields.size(); i++) { + fieldObjectInspectors.add(i, fields.get(i).getFieldObjectInspector()); + } + } + + List list = ((StructObjectInspector)oi).getStructFieldsDataAsList(o); + + this.isDeserialized = new ArrayList(fieldObjectInspectors.size()); + this.contents = new ArrayList(fieldObjectInspectors.size()); + + if (fieldObjectInspectors != null){ + for (int i = 0; i < fields.size(); i++) { + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + contents.add(i, f); + isDeserialized.add(i,false); + } + } + + this.allDeserialized = false; + } + + public LazyHCatRecord(List contents){ + this.contents = contents; + this.isDeserialized = new ArrayList(this.contents.size()); + this.setAllDeserialized(true); + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + for(int i = 0; i"); + LOG.debug("Exception " + e.getMessage()); + HCatUtil.logStackTrace(LOG); + } + } + return sb.toString(); + } + +}