Index: src/java/org/apache/hcatalog/data/HCatRecordSerDe.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordSerDe.java (revision 0) +++ src/java/org/apache/hcatalog/data/HCatRecordSerDe.java (revision 0) @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; + +/** + * SerDe class for serializing to and from HCatRecord + */ +public class HCatRecordSerDe implements SerDe { + + public static final Log LOG = LogFactory + .getLog(HCatRecordSerDe.class.getName()); + + public HCatRecordSerDe() throws SerDeException{ + } + + private List columnNames; + private List columnTypes; + private StructTypeInfo rowTypeInfo; + + private HCatRecordObjectInspector cachedObjectInspector; + + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + LOG.debug("Initializing SerDe"); + // Get column names and types + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + if (LOG.isDebugEnabled()){ + LOG.debug("columns:" + columnNameProperty); + for (String s : columnNames){ + LOG.debug("cn:"+s); + } + LOG.debug("types: " + columnTypeProperty); + for (TypeInfo t : columnTypes){ + LOG.debug("ct:"+t.getTypeName()+",type:"+t.getCategory()); + } + } + + // all table column names + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + // all column types + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + + assert (columnNames.size() == columnTypes.size()); + + rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + + cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); + + } + + /** + * The purpose of a deserialize method is to turn a data blob + * which is a writable representation of the data into an + * object that can then be parsed using the appropriate + * ObjectInspector. In this case, since HCatRecord is directly + * already the Writable object, there's no extra work to be done + * here. Most of the logic resides in the ObjectInspector to be + * able to return values from within the HCatRecord to hive when + * it wants it. + */ + @Override + public Object deserialize(Writable data) throws SerDeException { + if (!(data instanceof HCatRecord)) { + throw new SerDeException(getClass().getName() + ": expects HCatRecord!"); + } + + return (HCatRecord) data; + } + + /** + * The purpose of the serialize method is to turn an object-representation + * with a provided ObjectInspector into a Writable format, which + * the underlying layer can then use to write out. + * + * In this case, it means that Hive will call this method to convert + * an object with appropriate objectinspectors that it knows about, + * to write out a HCatRecord. + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) + throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + return new DefaultHCatRecord((List)serializeStruct(obj,(StructObjectInspector)objInspector)); + } + + + /** + * Return serialized HCatRecord from an underlying + * object-representation, and readable by an ObjectInspector + * @param obj : Underlying object-representation + * @param soi : StructObjectInspector + * @return HCatRecord + */ + private List serializeStruct(Object obj, StructObjectInspector soi) + throws SerDeException { + + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + List l = new ArrayList(fields.size()); + + if (fields != null){ + for (int i = 0; i < fields.size(); i++) { + + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + Object res = serializeField(f, foi); + l.add(i, res); + } + } + LOG.debug(l.toString()); + return l; + } + + /** + * Return underlying Java Object from an object-representation + * that is readable by a provided ObjectInspector. + */ + private Object serializeField(Object field, + ObjectInspector fieldObjectInspector) throws SerDeException { + Object res = null; + if (fieldObjectInspector.getCategory() == Category.PRIMITIVE){ + res = ((PrimitiveObjectInspector)fieldObjectInspector).getPrimitiveJavaObject(field); + } else if (fieldObjectInspector.getCategory() == Category.STRUCT){ + res = serializeStruct(field,(StructObjectInspector)fieldObjectInspector); + } else if (fieldObjectInspector.getCategory() == Category.LIST){ + res = serializeList(field,(ListObjectInspector)fieldObjectInspector); + } else if (fieldObjectInspector.getCategory() == Category.MAP){ + res = serializeMap(field,(MapObjectInspector)fieldObjectInspector); + } else { + throw new SerDeException(getClass().toString() + + " does not know what to do with fields of unknown category: " + + fieldObjectInspector.getCategory() + " , type: " + fieldObjectInspector.getTypeName()); + } + return res; + } + + /** + * Helper method to return underlying Java Map from + * an object-representation that is readable by a provided + * MapObjectInspector + */ + private Map serializeMap(Object f, MapObjectInspector moi) throws SerDeException { + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map m = new TreeMap(); + + Map readMap = moi.getMap(f); + if (readMap == null) { + return null; + } else { + for (Map.Entry entry: readMap.entrySet()) { + m.put(serializeField(entry.getKey(),koi), serializeField(entry.getValue(),voi)); + } + } + return m; + } + + private List serializeList(Object f, ListObjectInspector loi) throws SerDeException { + List l = loi.getList(f); + ObjectInspector eloi = loi.getListElementObjectInspector(); + if (eloi.getCategory() == Category.PRIMITIVE){ + return l; + } else if (eloi.getCategory() == Category.STRUCT){ + List> list = new ArrayList>(l.size()); + for (int i = 0 ; i < l.size() ; i++ ){ + list.add(serializeStruct(l.get(i), (StructObjectInspector) eloi)); + } + return list; + } else if (eloi.getCategory() == Category.LIST){ + List> list = new ArrayList>(l.size()); + for (int i = 0 ; i < l.size() ; i++ ){ + list.add(serializeList(l.get(i), (ListObjectInspector) eloi)); + } + } else if (eloi.getCategory() == Category.MAP){ + List> list = new ArrayList>(l.size()); + for (int i = 0 ; i < l.size() ; i++ ){ + list.add(serializeMap(l.get(i), (MapObjectInspector) eloi)); + } + throw new SerDeException("HCatSerDe map type unimplemented"); + } else { + throw new SerDeException(getClass().toString() + + " does not know what to do with fields of unknown category: " + + eloi.getCategory() + " , type: " + eloi.getTypeName()); + } + return l; + } + + /** + * Return an object inspector that can read through the object + * that we return from deserialize(). To wit, that means we need + * to return an ObjectInspector that can read HCatRecord, given + * the type info for it during initialize(). This also means + * that this method cannot and should not be called before initialize() + */ + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return (ObjectInspector) cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return HCatRecord.class; + } + + @Override + public SerDeStats getSerDeStats() { + // no support for statistics yet + return null; + } + +} Index: src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (revision 0) +++ src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (revision 0) @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +/** + * ObjectInspectorFactory for HCatRecordObjectInspectors (and associated helper inspectors) + */ +public class HCatRecordObjectInspectorFactory { + + public static final Log LOG = LogFactory + .getLog(HCatRecordObjectInspectorFactory.class.getName()); + + static HashMap cachedHCatRecordObjectInspectors = + new HashMap(); + static HashMap cachedObjectInspectors = + new HashMap(); + + /** + * Returns HCatRecordObjectInspector given a StructTypeInfo type definition for the record to look into + * @param typeInfo Type definition for the record to look into + * @return appropriate HCatRecordObjectInspector + * @throws SerDeException + */ + public static HCatRecordObjectInspector getHCatRecordObjectInspector( + StructTypeInfo typeInfo) throws SerDeException { + HCatRecordObjectInspector oi = cachedHCatRecordObjectInspectors.get(typeInfo); + if (oi == null) { + LOG.debug("Got asked for OI for "+typeInfo.getCategory()+"["+typeInfo.getTypeName()+"]"); + + switch (typeInfo.getCategory()) { + case STRUCT : + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldNames = structTypeInfo.getAllStructFieldNames(); + List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + List fieldObjectInspectors = new ArrayList(fieldTypeInfos.size()); + for (int i = 0; i < fieldTypeInfos.size(); i++) { + fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i))); + } + oi = new HCatRecordObjectInspector(fieldNames,fieldObjectInspectors); + break; + default: + // Hmm.. not good, + // the only type expected here is STRUCT, which maps to HCatRecord + // - anything else is an error. Return null as the inspector. + throw new SerDeException("TypeInfo ["+typeInfo.getTypeName() + + "] was not of struct type - HCatRecord expected struct type, got [" + + typeInfo.getCategory().toString()+"]"); + } + cachedHCatRecordObjectInspectors.put(typeInfo, oi); + } + return oi; + } + + public static ObjectInspector getStandardObjectInspectorFromTypeInfo(TypeInfo typeInfo) { + + + ObjectInspector oi = cachedObjectInspectors.get(typeInfo); + if (oi == null){ + LOG.debug("Got asked for OI for "+typeInfo.getCategory()+"["+typeInfo.getTypeName()+"]"); + + switch (typeInfo.getCategory()) { + case PRIMITIVE: + oi = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()); + break; + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldNames = structTypeInfo.getAllStructFieldNames(); + List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + List fieldObjectInspectors = + new ArrayList(fieldTypeInfos.size()); + for (int i = 0; i < fieldTypeInfos.size(); i++) { + fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i))); + } + oi = ObjectInspectorFactory.getStandardStructObjectInspector( + fieldNames, fieldObjectInspectors + ); + break; + case LIST: + ObjectInspector elementObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((ListTypeInfo)typeInfo).getListElementTypeInfo()); + oi = ObjectInspectorFactory.getStandardListObjectInspector(elementObjectInspector); + break; + case MAP: + ObjectInspector keyObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo()); + ObjectInspector valueObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((MapTypeInfo)typeInfo).getMapValueTypeInfo()); + oi = ObjectInspectorFactory.getStandardMapObjectInspector(keyObjectInspector,valueObjectInspector); + break; + default: + oi = null; + } + cachedObjectInspectors.put(typeInfo, oi); + } + return oi; + } + + +} Index: src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java (revision 0) +++ src/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java (revision 0) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.List; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +public class HCatRecordObjectInspector extends StandardStructObjectInspector { + + protected HCatRecordObjectInspector(List structFieldNames, + List structFieldObjectInspectors) { + super(structFieldNames, structFieldObjectInspectors); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + if (data == null){ + return null; + } + + int fieldID = ((MyField) fieldRef).getFieldID(); + assert (fieldID >= 0 && fieldID < fields.size()); + + return ((HCatRecord) data).get(fieldID); + } + + @Override + public List getStructFieldsDataAsList(Object o) { + return ((HCatRecord) o).getAll(); + } + +} Index: src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java =================================================================== --- src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (revision 0) +++ src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java (revision 0) @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.io.Writable; + +import junit.framework.Assert; +import junit.framework.TestCase; + +public class TestHCatRecordSerDe extends TestCase{ + + public Map getData(){ + Map data = new HashMap(); + + List rlist = new ArrayList(11); + rlist.add(new Byte("123")); + rlist.add(new Short("456")); + rlist.add(new Integer(789)); + rlist.add(new Long(1000L)); + rlist.add(new Double(5.3D)); + rlist.add(new Float(2.39F)); + rlist.add(new String("hcat and hadoop")); + rlist.add(null); + + List innerStruct = new ArrayList(2); + innerStruct.add(new String("abc")); + innerStruct.add(new String("def")); + rlist.add(innerStruct); + + List innerList = new ArrayList(); + innerList.add(314); + innerList.add(007); + rlist.add(innerList); + + Map map = new HashMap(3); + map.put(new Short("2"), "hcat is cool"); + map.put(new Short("3"), "is it?"); + map.put(new Short("4"), "or is it not?"); + rlist.add(map); + + rlist.add(new Boolean(true)); + + List c1 = new ArrayList(); + List c1_1 = new ArrayList(); + c1_1.add(new Integer(12)); + List i2 = new ArrayList(); + List ii1 = new ArrayList(); + ii1.add(new Integer(13)); + ii1.add(new Integer(14)); + i2.add(ii1); + Map> ii2 = new HashMap>(); + List iii1 = new ArrayList(); + iii1.add(new Integer(15)); + ii2.put("phew", iii1); + i2.add(ii2); + c1_1.add(i2); + c1.add(c1_1); + rlist.add(c1); + + String typeString = + "tinyint,smallint,int,bigint,double,float,string,string," + + "struct,array,map,boolean," + + "array,ii2:map>>>>"; + Properties props = new Properties(); + + props.put(Constants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1"); + props.put(Constants.LIST_COLUMN_TYPES, typeString); + + data.put(props, new DefaultHCatRecord(rlist)); + return data; + } + + public void testRW() throws Exception { + + Configuration conf = new Configuration(); + + for (Entry e : getData().entrySet()){ + Properties tblProps = e.getKey(); + HCatRecord r = e.getValue(); + + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(conf, tblProps); + + System.out.println("ORIG:"+r.toString()); + + Writable s = hrsd.serialize(r,hrsd.getObjectInspector()); + System.out.println("ONE:"+s.toString()); + + HCatRecord r2 = (HCatRecord) hrsd.deserialize(s); + Assert.assertTrue(r.equals(r2)); + + // If it went through correctly, then s is also a HCatRecord, + // and also equal to the above, and a deepcopy, and this holds + // through for multiple levels more of serialization as well. + + Writable s2 = hrsd.serialize(s, hrsd.getObjectInspector()); + System.out.println("TWO:"+s2.toString()); + Assert.assertTrue(r.equals((HCatRecord)s)); + Assert.assertTrue(r.equals((HCatRecord)s2)); + + // serialize using another serde, and read out that object repr. + LazySimpleSerDe testSD = new LazySimpleSerDe(); + testSD.initialize(conf, tblProps); + + Writable s3 = testSD.serialize(s, hrsd.getObjectInspector()); + System.out.println("THREE:"+s3.toString()); + Object o3 = testSD.deserialize(s3); + Assert.assertFalse(r.getClass().equals(o3.getClass())); + + // then serialize again using hrsd, and compare results + HCatRecord s4 = (HCatRecord) hrsd.serialize(o3, testSD.getObjectInspector()); + System.out.println("FOUR:"+s4.toString()); + Assert.assertFalse(r.equals(s4)); + + } + + } + +}