Index: src/test/org/apache/hcatalog/data/TestJacksonSerDe.java =================================================================== --- src/test/org/apache/hcatalog/data/TestJacksonSerDe.java (revision 0) +++ src/test/org/apache/hcatalog/data/TestJacksonSerDe.java (revision 0) @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestJacksonSerDe extends TestCase{ + + private static final Logger LOG = LoggerFactory.getLogger(TestJacksonSerDe.class); + + public List> getData(){ + List> data = new ArrayList>(); + + List rlist = new ArrayList(13); + rlist.add(new Byte("123")); + rlist.add(new Short("456")); + rlist.add(new Integer(789)); + rlist.add(new Long(1000L)); + rlist.add(new Double(5.3D)); + rlist.add(new Float(2.39F)); + rlist.add(new String("hcat and hadoop")); + rlist.add(null); + + List innerStruct = new ArrayList(2); + innerStruct.add(new String("abc")); + innerStruct.add(new String("def")); + rlist.add(innerStruct); + + List innerList = new ArrayList(); + innerList.add(314); + innerList.add(007); + rlist.add(innerList); + + Map map = new HashMap(3); + map.put(new Short("2"), "hcat is cool"); + map.put(new Short("3"), "is it?"); + map.put(new Short("4"), "or is it not?"); + rlist.add(map); + + rlist.add(new Boolean(true)); + + List c1 = new ArrayList(); + List c1_1 = new ArrayList(); + c1_1.add(new Integer(12)); + List i2 = new ArrayList(); + List ii1 = new ArrayList(); + ii1.add(new Integer(13)); + ii1.add(new Integer(14)); + i2.add(ii1); + Map> ii2 = new HashMap>(); + List iii1 = new ArrayList(); + iii1.add(new Integer(15)); + ii2.put("phew", iii1); + i2.add(ii2); + c1_1.add(i2); + c1.add(c1_1); + rlist.add(c1); + + List nlist = new ArrayList(13); + nlist.add(null); // tinyint + nlist.add(null); // smallint + nlist.add(null); // int + nlist.add(null); // bigint + nlist.add(null); // double + nlist.add(null); // float + nlist.add(null); // string + nlist.add(null); // string + nlist.add(null); // struct + nlist.add(null); // array + nlist.add(null); // map + nlist.add(null); // bool + nlist.add(null); // complex + + String typeString = + "tinyint,smallint,int,bigint,double,float,string,string," + + "struct,array,map,boolean," + + "array,ii2:map>>>>"; + Properties props = new Properties(); + + props.put(Constants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1"); + props.put(Constants.LIST_COLUMN_TYPES, typeString); +// props.put(Constants.SERIALIZATION_NULL_FORMAT, "\\N"); +// props.put(Constants.SERIALIZATION_FORMAT, "1"); + + data.add(new Pair(props, new DefaultHCatRecord(rlist))); + data.add(new Pair(props, new DefaultHCatRecord(nlist))); + return data; + } + + public void testRW() throws Exception { + + Configuration conf = new Configuration(); + + for (Pair e : getData()){ + Properties tblProps = e.first; + HCatRecord r = e.second; + + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(conf, tblProps); + + JacksonSerDe jsde = new JacksonSerDe(); + jsde.initialize(conf, tblProps); + + LOG.info("ORIG:{}",r); + + Writable s = hrsd.serialize(r,hrsd.getObjectInspector()); + LOG.info("ONE:{}",s); + + Object o1 = hrsd.deserialize(s); + assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o1)); + + Writable s2 = jsde.serialize(o1, hrsd.getObjectInspector()); + LOG.info("TWO:{}",s2); + Object o2 = jsde.deserialize(s2); + LOG.info("deserialized TWO : {} ", o2); + + assertTrue(HCatDataCheckUtil.recordsEqual(r, new DefaultHCatRecord( (List)o2))); + } + + } + + public void testRobustRead() throws Exception { + /** + * This test has been added to account for HCATALOG-436 + * We write out columns with "internal column names" such + * as "_col0", but try to read with retular column names. + */ + + Configuration conf = new Configuration(); + + for (Pair e : getData()){ + Properties tblProps = e.first; + HCatRecord r = e.second; + + Properties internalTblProps = new Properties(); + for (Map.Entry pe : tblProps.entrySet()){ + if (!pe.getKey().equals(Constants.LIST_COLUMNS)){ + internalTblProps.put(pe.getKey(), pe.getValue()); + } else { + internalTblProps.put(pe.getKey(),getInternalNames((String) pe.getValue())); + } + } + + LOG.info("orig tbl props:{}",tblProps); + LOG.info("modif tbl props:{}",internalTblProps); + + JacksonSerDe wjsd = new JacksonSerDe(); + wjsd.initialize(conf, internalTblProps); + + JacksonSerDe rjsd = new JacksonSerDe(); + rjsd.initialize(conf, tblProps); + + LOG.info("ORIG:{}",r); + + Writable s = wjsd.serialize(r.getAll(),wjsd.getObjectInspector()); + LOG.info("ONE:{}",s); + + Object o1 = wjsd.deserialize(s); + LOG.info("deserialized ONE : {} ", o1); + + Object o2 = rjsd.deserialize(s); + LOG.info("deserialized TWO : {} ", o2); + assertTrue(HCatDataCheckUtil.recordsEqual(r, new DefaultHCatRecord((List)o2))); + } + + } + + String getInternalNames(String columnNames){ + if (columnNames == null) { + return null; + } + if (columnNames.isEmpty()) { + return ""; + } + + StringBuffer sb = new StringBuffer(); + int numStrings = columnNames.split(",").length; + sb.append("_col0"); + for (int i = 1; i < numStrings ; i++ ){ + sb.append(","); + sb.append(HiveConf.getColumnInternalName(i)); + } + return sb.toString(); + } +} Index: src/java/org/apache/hcatalog/data/JacksonSerDe.java =================================================================== --- src/java/org/apache/hcatalog/data/JacksonSerDe.java (revision 0) +++ src/java/org/apache/hcatalog/data/JacksonSerDe.java (revision 0) @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JacksonSerDe implements SerDe { + + private static final Logger LOG = LoggerFactory.getLogger(JacksonSerDe.class); + private List columnNames; + private List columnTypes; + + private StructTypeInfo rowTypeInfo; + + private JsonFactory jsonFactory = null; + + private JacksonSerDeObjectInspector cachedObjectInspector; + + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + + LOG.debug("Initializing JacksonSerDe"); + LOG.debug("props to serde: {}",tbl.entrySet()); + + + // Get column names and types + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // all table column names + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + // all column types + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + LOG.debug("columns: {}, {}" , columnNameProperty, columnNames); + LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes); + + assert (columnNames.size() == columnTypes.size()); + + rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + + cachedObjectInspector = JacksonSerDeObjectInspectorFactory.getJacksonSerDeObjectInspector(rowTypeInfo); + + jsonFactory = new JsonFactory(); + } + + /** + * Takes JSON string in Text form, and has to return an object representation above + * it that's readable by the corresponding object inspector. + * + * For this implementation, since we're using the jackson parser, we can construct + * our own object implementation, and we use a List for it that contains + * appropriate standard java inner types. + */ + @Override + public Object deserialize(Writable blob) throws SerDeException { + + Text t = (Text)blob; + JsonParser p; + List r = new ArrayList(Collections.nCopies(columnNames.size(), null)); + try { + p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes()))); + if (p.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("Start token not found where expected"); + } + JsonToken token; + while( ((token = p.nextToken()) != JsonToken.END_OBJECT)&&(token != null)){ + // iterate through each token, and create appropriate object here. + populateRecord(r,token,p,rowTypeInfo); + } + } catch (JsonParseException e) { + LOG.warn("Error [{}] parsing json text [{}].", e, t); + LOG.debug(null,e); + throw new SerDeException(e); + } catch (IOException e) { + LOG.warn("Error [{}] parsing json text [{}].", e, t); + LOG.debug(null,e); + throw new SerDeException(e); + } + + return r; + } + + private void populateRecord(List r, JsonToken token, JsonParser p, + StructTypeInfo stype) throws IOException { + if (token != JsonToken.FIELD_NAME){ + LOG.error("Field name expected, got : p.getIntVal {} , name : {}",p.getIntValue(),token.name()); + throw new IOException("Field name expected"); + } + String fieldName = p.getText(); + TypeInfo fieldTypeInfo = null; + int fpos = stype.getAllStructFieldNames().indexOf(fieldName); + + if (fpos == -1){ + LOG.debug("Was unable to find position for field [{}] in typeinfo " + + "[{}]. Probably using an internal name. Attempting to figure out.",fieldName,stype); + fpos = getPositionFromHiveInternalColumnName(fieldName); + if (!fieldName.equalsIgnoreCase(getHiveInternalColumnName(fpos))){ + LOG.error("Hive internal column name {} and position " + +"encoding {} for the column name are at odds - ",fieldName,fpos); + } + if (fpos == -1){ + throw new IOException("Unable to find position for fieldname ["+fieldName+"]"); + } + } + + fieldTypeInfo = stype.getAllStructFieldTypeInfos().get(fpos); + Object currField = extractCurrentField(p, fieldTypeInfo, false); + + r.set(fpos,currField); + } + + public String getHiveInternalColumnName(int fpos) { + return HiveConf.getColumnInternalName(fpos); + } + + public int getPositionFromHiveInternalColumnName(String internalName) { +// return HiveConf.getPositionFromInternalName(fieldName); + // The above line should have been all the implementation that + // we need, but due to a bug in that impl which recognizes + // only single-digit columns, we need another impl here. + Pattern internalPattern = Pattern.compile("_col([0-9]+)"); + Matcher m = internalPattern.matcher(internalName); + if (!m.matches()){ + return -1; + } else { + return Integer.parseInt(m.group(1)); + } + } + + /** + * Utility method to extract current expected field from given JsonParser + * + * To get the field, we need the underlying typeInfo + * + * isTokenCurrent is a boolean variable also passed in, which determines + * if the JsonParser is already at the token we expect to read next, or + * needs advancing to the next before we read. + */ + private Object extractCurrentField(JsonParser p, TypeInfo ftype, + boolean isTokenCurrent) throws IOException, JsonParseException { + Object val = null; + JsonToken valueToken; + if (isTokenCurrent){ + valueToken = p.getCurrentToken(); + } else { + valueToken = p.nextToken(); + } + + Category typeCategory = ftype.getCategory(); + + switch(typeCategory){ + case PRIMITIVE: + switch(((PrimitiveTypeInfo)ftype).getPrimitiveCategory()) { + case BOOLEAN: + String bval = (valueToken == JsonToken.VALUE_NULL)?null:p.getText(); + if (bval != null){ + val = Boolean.valueOf(bval); + } else { + val = null; + } + break; + case BYTE: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getByteValue(); + break; + case SHORT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getShortValue(); + break; + case INT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getIntValue(); + break; + case LONG: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getLongValue(); + break; + case FLOAT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getFloatValue(); + break; + case DOUBLE: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getDoubleValue(); + break; + case STRING: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getText(); + break; + case BINARY: + // TODO : maybe base64encode or somesuch. + throw new IOException("JacksonSerDe does not support BINARY type"); + default: + throw new TypeNotPresentException(((PrimitiveTypeInfo)ftype).getTypeName(), null); + } + break; + case LIST: + if (valueToken == JsonToken.VALUE_NULL){ + val = null; + break; + } + if (valueToken != JsonToken.START_ARRAY){ + throw new IOException("Start of Array expected"); + } + List arr = new ArrayList(); + while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) { + arr.add(extractCurrentField(p, + ((ListTypeInfo)ftype).getListElementTypeInfo(),true)); + } + val = arr; + break; + case MAP: + if (valueToken == JsonToken.VALUE_NULL){ + val = null; + break; + } + if (valueToken != JsonToken.START_OBJECT){ + throw new IOException("Start of Object expected"); + } + Map map = new LinkedHashMap(); + + TypeInfo keyTypeInfo = ((MapTypeInfo)ftype).getMapKeyTypeInfo(); + TypeInfo valueTypeInfo = ((MapTypeInfo)ftype).getMapValueTypeInfo(); + + + while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { + Object k = getObjectOfCorrespondingPrimitiveTypeInfo(p.getCurrentName(),keyTypeInfo); + Object v; + if ( valueTypeInfo.getCategory() == ObjectInspector.Category.STRUCT ){ + v = extractCurrentField(p,valueTypeInfo,false); + } else { + v = extractCurrentField(p,valueTypeInfo,true); + } + + map.put(k, v); + } + val = map; + break; + case STRUCT: + if (valueToken == JsonToken.VALUE_NULL){ + val = null; + break; + } + if (valueToken != JsonToken.START_OBJECT){ + throw new IOException("Start of Object expected"); + } + + int sz = (((StructTypeInfo)ftype).getAllStructFieldTypeInfos()).size(); + + List struct = new ArrayList(Collections.nCopies(sz, null)); + while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { + populateRecord(struct, valueToken, p, (StructTypeInfo) ftype); // FIX : some removal + } + val = struct; + break; + } + + return val; + } + + private Object getObjectOfCorrespondingPrimitiveTypeInfo(String s, TypeInfo t) throws IOException { + switch(((PrimitiveTypeInfo)t).getPrimitiveCategory()) { + case INT: + return Integer.valueOf(s); + case BYTE: + return Byte.valueOf(s); + case SHORT: + return Short.valueOf(s); + case LONG: + return Long.valueOf(s); + case BOOLEAN: + return (s.equalsIgnoreCase("true")); + case FLOAT: + return Float.valueOf(s); + case DOUBLE: + return Double.valueOf(s); + case STRING: + return s; + case BINARY: + throw new IOException("JacksonSerDe does not support BINARY type"); + } + throw new IOException("Could not convert from string to map type "+t); + } + + + /** + * Given an object and object inspector pair, traverse the object + * and generate a Text representation of the object. + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) + throws SerDeException { + StringBuilder sb = new StringBuilder(); + try { + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List structFields = soi.getAllStructFieldRefs(); + assert (columnNames.size() == structFields.size()); + if (obj == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + for (int i = 0; i < structFields.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + sb.append(SerDeUtils.QUOTE); + sb.append(columnNames.get(i)); + sb.append(SerDeUtils.QUOTE); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, soi.getStructFieldData(obj, structFields.get(i)), + structFields.get(i).getFieldObjectInspector()); + } + sb.append(SerDeUtils.RBRACE); + } + + } catch (IOException e) { + LOG.warn("Error generating json text from object.", e); + throw new SerDeException(e); + } + return new Text(sb.toString()); + } + + // TODO : code section copied over from SerDeUtils because of non-standard json production there + // should use quotes for all field names. We should fix this there, and then remove this copy. + // See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES + // for details - trying to enable Jackson to ignore that doesn't seem to work(compilation failure + // when attempting to use that feature, so having to change the production itself. + // Also, throws IOException when Binary is detected. + private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector oi) throws IOException { + + switch (oi.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + if (o == null) { + sb.append("null"); + } else { + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: { + boolean b = ((BooleanObjectInspector) poi).get(o); + sb.append(b ? "true" : "false"); + break; + } + case BYTE: { + sb.append(((ByteObjectInspector) poi).get(o)); + break; + } + case SHORT: { + sb.append(((ShortObjectInspector) poi).get(o)); + break; + } + case INT: { + sb.append(((IntObjectInspector) poi).get(o)); + break; + } + case LONG: { + sb.append(((LongObjectInspector) poi).get(o)); + break; + } + case FLOAT: { + sb.append(((FloatObjectInspector) poi).get(o)); + break; + } + case DOUBLE: { + sb.append(((DoubleObjectInspector) poi).get(o)); + break; + } + case STRING: { + sb.append('"'); + sb.append(SerDeUtils.escapeString(((StringObjectInspector) poi) + .getPrimitiveJavaObject(o))); + sb.append('"'); + break; + } + case TIMESTAMP: { + sb.append('"'); + sb.append(((TimestampObjectInspector) poi) + .getPrimitiveWritableObject(o)); + sb.append('"'); + break; + } + case BINARY: { + throw new IOException("JacksonSerDe does not support BINARY type"); + } + default: + throw new RuntimeException("Unknown primitive type: " + + poi.getPrimitiveCategory()); + } + } + break; + } + case LIST: { + ListObjectInspector loi = (ListObjectInspector) oi; + ObjectInspector listElementObjectInspector = loi + .getListElementObjectInspector(); + List olist = loi.getList(o); + if (olist == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACKET); + for (int i = 0; i < olist.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + buildJSONString(sb, olist.get(i), listElementObjectInspector); + } + sb.append(SerDeUtils.RBRACKET); + } + break; + } + case MAP: { + MapObjectInspector moi = (MapObjectInspector) oi; + ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector(); + ObjectInspector mapValueObjectInspector = moi + .getMapValueObjectInspector(); + Map omap = moi.getMap(o); + if (omap == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + boolean first = true; + for (Object entry : omap.entrySet()) { + if (first) { + first = false; + } else { + sb.append(SerDeUtils.COMMA); + } + Map.Entry e = (Map.Entry) entry; + StringBuilder keyBuilder = new StringBuilder(); + buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector); + String keyString = keyBuilder.toString().trim(); + boolean doQuoting = (!keyString.isEmpty()) && (keyString.charAt(0)!= SerDeUtils.QUOTE); + if (doQuoting ){ + sb.append(SerDeUtils.QUOTE); + } + sb.append(keyString); + if (doQuoting ){ + sb.append(SerDeUtils.QUOTE); + } + sb.append(SerDeUtils.COLON); + buildJSONString(sb, e.getValue(), mapValueObjectInspector); + } + sb.append(SerDeUtils.RBRACE); + } + break; + } + case STRUCT: { + StructObjectInspector soi = (StructObjectInspector) oi; + List structFields = soi.getAllStructFieldRefs(); + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + for (int i = 0; i < structFields.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + sb.append(SerDeUtils.QUOTE); + sb.append(structFields.get(i).getFieldName()); + sb.append(SerDeUtils.QUOTE); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)), + structFields.get(i).getFieldObjectInspector()); + } + sb.append(SerDeUtils.RBRACE); + } + break; + } + case UNION: { + UnionObjectInspector uoi = (UnionObjectInspector) oi; + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + sb.append(uoi.getTag(o)); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, uoi.getField(o), + uoi.getObjectInspectors().get(uoi.getTag(o))); + sb.append(SerDeUtils.RBRACE); + } + break; + } + default: + throw new RuntimeException("Unknown type in ObjectInspector!"); + } + } + + + /** + * Returns an object inspector for the specified schema that + * is capable of reading in the object representation of the JSON string + */ + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + @Override + public SerDeStats getSerDeStats() { + // no support for statistics yet + return null; + } + +} Index: src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspectorFactory.java =================================================================== --- src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspectorFactory.java (revision 0) +++ src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspectorFactory.java (revision 0) @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ObjectInspectorFactory for JacksonSerDeObjectInspectors (and associated helper inspectors) + */ +public class JacksonSerDeObjectInspectorFactory { + + private final static Logger LOG = LoggerFactory.getLogger(JacksonSerDeObjectInspectorFactory.class); + + static HashMap cachedJacksonSerDeObjectInspectors = + new HashMap(); + static HashMap cachedObjectInspectors = + new HashMap(); + + /** + * Returns JacksonSerDeObjectInspector given a StructTypeInfo type definition for the record to look into + * @param typeInfo Type definition for the record to look into + * @return appropriate JacksonSerDeObjectInspector + * @throws SerDeException + */ + public static JacksonSerDeObjectInspector getJacksonSerDeObjectInspector( + StructTypeInfo typeInfo) throws SerDeException { + JacksonSerDeObjectInspector oi = cachedJacksonSerDeObjectInspectors.get(typeInfo); + if (oi == null) { + + LOG.debug("Got asked for OI for {} [{} ]",typeInfo.getCategory(),typeInfo.getTypeName()); + switch (typeInfo.getCategory()) { + case STRUCT : + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldNames = structTypeInfo.getAllStructFieldNames(); + List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + List fieldObjectInspectors = new ArrayList(fieldTypeInfos.size()); + for (int i = 0; i < fieldTypeInfos.size(); i++) { + fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i))); + } + oi = new JacksonSerDeObjectInspector(fieldNames,fieldObjectInspectors); + + break; + default: + // Hmm.. not good, + // the only type expected here is STRUCT + // - anything else is an error. Return null as the inspector. + throw new SerDeException("TypeInfo ["+typeInfo.getTypeName() + + "] was not of struct type - expected struct type, got [" + + typeInfo.getCategory().toString()+"]"); + } + cachedJacksonSerDeObjectInspectors.put(typeInfo, oi); + } + return oi; + } + + public static ObjectInspector getStandardObjectInspectorFromTypeInfo(TypeInfo typeInfo) { + + + ObjectInspector oi = cachedObjectInspectors.get(typeInfo); + if (oi == null){ + + LOG.debug("Got asked for OI for {}, [{}]",typeInfo.getCategory(), typeInfo.getTypeName()); + switch (typeInfo.getCategory()) { + case PRIMITIVE: + oi = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()); + break; + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldNames = structTypeInfo.getAllStructFieldNames(); + List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + List fieldObjectInspectors = + new ArrayList(fieldTypeInfos.size()); + for (int i = 0; i < fieldTypeInfos.size(); i++) { + fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i))); + } + oi = ObjectInspectorFactory.getStandardStructObjectInspector( + fieldNames, fieldObjectInspectors + ); + break; + case LIST: + ObjectInspector elementObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((ListTypeInfo)typeInfo).getListElementTypeInfo()); + oi = ObjectInspectorFactory.getStandardListObjectInspector(elementObjectInspector); + break; + case MAP: + ObjectInspector keyObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((MapTypeInfo)typeInfo).getMapKeyTypeInfo()); + ObjectInspector valueObjectInspector = getStandardObjectInspectorFromTypeInfo( + ((MapTypeInfo)typeInfo).getMapValueTypeInfo()); + oi = ObjectInspectorFactory.getStandardMapObjectInspector(keyObjectInspector,valueObjectInspector); + break; + default: + oi = null; + } + cachedObjectInspectors.put(typeInfo, oi); + } + return oi; + } + + +} Index: src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspector.java =================================================================== --- src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspector.java (revision 0) +++ src/java/org/apache/hcatalog/data/JacksonSerDeObjectInspector.java (revision 0) @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data; +import java.util.List; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + + +public class JacksonSerDeObjectInspector extends StandardStructObjectInspector { + + protected JacksonSerDeObjectInspector(List structFieldNames, + List structFieldObjectInspectors) { + super(structFieldNames, structFieldObjectInspectors); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + if (data == null){ + return new IllegalArgumentException("Data passed in to get field from was null!"); + } + + int fieldID = ((MyField) fieldRef).getFieldID(); + if (!(fieldID >= 0 && fieldID < fields.size())){ + throw new IllegalArgumentException("Invalid field index ["+fieldID+"]"); + } + + return ((List) data).get(fieldID); + } + + @Override + public List getStructFieldsDataAsList(Object o) { + return (List)o; + } + +}