Index: src/java/org/apache/hcatalog/data/JsonSerDe.java =================================================================== --- src/java/org/apache/hcatalog/data/JsonSerDe.java (revision 0) +++ src/java/org/apache/hcatalog/data/JsonSerDe.java (revision 0) @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; + +public class JsonSerDe implements SerDe { + + public static final Log LOG = LogFactory + .getLog(HCatRecordSerDe.class.getName()); + + private List columnNames; + private List columnTypes; + + private StructTypeInfo rowTypeInfo; + private HCatSchema schema; + + private JsonFactory jsonFactory = null; + + private HCatRecordObjectInspector cachedObjectInspector; + + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + if (LOG.isDebugEnabled()){ + LOG.debug("Initializing HCatRecordSerDe"); + HCatUtil.logEntrySet(LOG, "props to serde", tbl.entrySet()); + } + + // Get column names and types + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // all table column names + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + // all column types + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + if (LOG.isDebugEnabled()){ + LOG.debug("columns:" + columnNameProperty); + for (String s : columnNames){ + LOG.debug("cn:"+s); + } + LOG.debug("types: " + columnTypeProperty); + for (TypeInfo t : columnTypes){ + LOG.debug("ct:"+t.getTypeName()+",type:"+t.getCategory()); + } + } + + assert (columnNames.size() == columnTypes.size()); + + rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + + cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo); + try { + schema = HCatSchemaUtils.getHCatSchema(rowTypeInfo).get(0).getStructSubSchema(); +// System.out.println("schema : "+ schema); +// System.out.println("\tfields : "+schema.getFieldNames()); + } catch (HCatException e) { + throw new SerDeException(e); + } + + jsonFactory = new JsonFactory(); + } + + /** + * Takes JSON string in Text form, and has to return an object representation above + * it that's readable by the corresponding object inspector. + * + * For this implementation, since we're using the jackson parser, we can construct + * our own object implementation, and we use HCatRecord for it + */ + @Override + public Object deserialize(Writable blob) throws SerDeException { + + Text t = (Text)blob; + JsonParser p; + List r = new ArrayList(columnNames.size()); + for (int i = 0 ; i < columnNames.size(); i++){ + r.add(i, null); + } + try { + p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes()))); + if (p.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("Start token not found where expected"); + } + JsonToken token; + while( ((token = p.nextToken()) != JsonToken.END_OBJECT)&&(token != null)){ + // iterate through each token, and create appropriate object here. +// System.err.println("Token : "+token.toString()); + populateRecord(r,token,p,schema); + } + } catch (JsonParseException e) { + LOG.warn("Error ["+ e.getMessage()+"] parsing json text ["+t+"]"); + throw new SerDeException(e); + } catch (IOException e) { + LOG.warn("Error ["+ e.getMessage()+"] parsing json text ["+t+"]"); + throw new SerDeException(e); + } + + return new DefaultHCatRecord(r); + } + + private void populateRecord(List r, JsonToken token, JsonParser p, HCatSchema s) throws IOException { + if (token != JsonToken.FIELD_NAME){ + throw new IOException("Field name expected"); + } +// System.err.println("p.getText " + p.getText()); + String fieldName = p.getText(); + int fpos = s.getPosition(fieldName); + HCatFieldSchema hcatFieldSchema = s.getFields().get(fpos); + + r.set(fpos,extractCurrentField(p, null, hcatFieldSchema,false)); + } + + /** + * Utility method to extract current expected field from given JsonParser + * + * To get the field, we need either a type or a hcatFieldSchema(necessary for complex types) + * It is possible that one of them can be null, and so, if so, the other is instantiated + * from the other + * + * isTokenCurrent is a boolean variable also passed in, which determines + * if the JsonParser is already at the token we expect to read next, or + * needs advancing to the next before we read. + */ + private Object extractCurrentField(JsonParser p, Type t, + HCatFieldSchema hcatFieldSchema, boolean isTokenCurrent) throws IOException, JsonParseException, + HCatException { + Object val = null; + JsonToken valueToken; + if (isTokenCurrent){ + valueToken = p.getCurrentToken(); + } else { + valueToken = p.nextToken(); + } + + if (hcatFieldSchema != null){ + t = hcatFieldSchema.getType(); + } + switch(t) { + case INT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getIntValue(); + break; + case TINYINT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getByteValue(); + break; + case SMALLINT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getShortValue(); + break; + case BIGINT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getLongValue(); + break; + case BOOLEAN: + String bval = (valueToken == JsonToken.VALUE_NULL)?null:p.getText(); + val = (bval.equalsIgnoreCase("true")); + break; + case FLOAT: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getFloatValue(); + break; + case DOUBLE: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getDoubleValue(); + break; + case STRING: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getText(); + break; + case BINARY: + val = (valueToken == JsonToken.VALUE_NULL)?null:p.getByteValue(); + break; + case ARRAY: +// System.err.println("Begin array"); + if (valueToken != JsonToken.START_ARRAY){ + throw new IOException("Start of Array expected"); + } + List arr = new ArrayList(); + while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) { +// System.out.println(valueToken.toString()); + arr.add(extractCurrentField(p, null,hcatFieldSchema.getArrayElementSchema().get(0),true)); + } + val = arr; + break; + case MAP: +// System.err.println("Begin map"); + if (valueToken != JsonToken.START_OBJECT){ + throw new IOException("Start of Object expected"); + } + Map map = new LinkedHashMap(); + Type keyType = hcatFieldSchema.getMapKeyType(); + HCatFieldSchema valueSchema = hcatFieldSchema.getMapValueSchema().get(0); + while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { +// System.err.println(" getting mapkey"); + Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(),keyType); +// System.err.println(" getting mapvalue"); + Object v; + if (valueSchema.getType() == HCatFieldSchema.Type.STRUCT){ + v = extractCurrentField(p,null, valueSchema,false); + } else { + v = extractCurrentField(p,null, valueSchema,true); + } +// System.err.println(" got mapval"); + + map.put(k, v); + } + val = map; + break; + case STRUCT: +// System.err.println("Begin struct"); + if (valueToken != JsonToken.START_OBJECT){ + throw new IOException("Start of Object expected"); + } + HCatSchema subSchema = hcatFieldSchema.getStructSubSchema(); + int sz = subSchema.getFieldNames().size(); +// System.err.println("expected size "+sz); + List struct = new ArrayList(sz); + for (int i = 0; i olist = loi.getList(o); + if (olist == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACKET); + for (int i = 0; i < olist.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + buildJSONString(sb, olist.get(i), listElementObjectInspector); + } + sb.append(SerDeUtils.RBRACKET); + } + break; + } + case MAP: { + MapObjectInspector moi = (MapObjectInspector) oi; + ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector(); + ObjectInspector mapValueObjectInspector = moi + .getMapValueObjectInspector(); + Map omap = moi.getMap(o); + if (omap == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + boolean first = true; + for (Object entry : omap.entrySet()) { + if (first) { + first = false; + } else { + sb.append(SerDeUtils.COMMA); + } + Map.Entry e = (Map.Entry) entry; + StringBuilder keyBuilder = new StringBuilder(); + buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector); + String keyString = keyBuilder.toString().trim(); + boolean doQuoting = (!keyString.isEmpty()) && (keyString.charAt(0)!= SerDeUtils.QUOTE); + if (doQuoting ){ + sb.append(SerDeUtils.QUOTE); + } + sb.append(keyString); + if (doQuoting ){ + sb.append(SerDeUtils.QUOTE); + } + sb.append(SerDeUtils.COLON); + buildJSONString(sb, e.getValue(), mapValueObjectInspector); + } + sb.append(SerDeUtils.RBRACE); + } + break; + } + case STRUCT: { + StructObjectInspector soi = (StructObjectInspector) oi; + List structFields = soi.getAllStructFieldRefs(); + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + for (int i = 0; i < structFields.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); + } + sb.append(SerDeUtils.QUOTE); + sb.append(structFields.get(i).getFieldName()); + sb.append(SerDeUtils.QUOTE); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)), + structFields.get(i).getFieldObjectInspector()); + } + sb.append(SerDeUtils.RBRACE); + } + break; + } + case UNION: { + UnionObjectInspector uoi = (UnionObjectInspector) oi; + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + sb.append(uoi.getTag(o)); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, uoi.getField(o), + uoi.getObjectInspectors().get(uoi.getTag(o))); + sb.append(SerDeUtils.RBRACE); + } + break; + } + default: + throw new RuntimeException("Unknown type in ObjectInspector!"); + } + } + + + /** + * Returns an object inspector for the specified schema that + * is capable of reading in the object representation of the JSON string + */ + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + @Override + public SerDeStats getSerDeStats() { + // no support for statistics yet + return null; + } + +} Index: src/test/org/apache/hcatalog/data/TestJsonSerDe.java =================================================================== --- src/test/org/apache/hcatalog/data/TestJsonSerDe.java (revision 0) +++ src/test/org/apache/hcatalog/data/TestJsonSerDe.java (revision 0) @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Writable; + +import junit.framework.Assert; +import junit.framework.TestCase; + +public class TestJsonSerDe extends TestCase{ + + public Map getData(){ + Map data = new HashMap(); + + List rlist = new ArrayList(11); + rlist.add(new Byte("123")); + rlist.add(new Short("456")); + rlist.add(new Integer(789)); + rlist.add(new Long(1000L)); + rlist.add(new Double(5.3D)); + rlist.add(new Float(2.39F)); + rlist.add(new String("hcat and hadoop")); + rlist.add(null); + + List innerStruct = new ArrayList(2); + innerStruct.add(new String("abc")); + innerStruct.add(new String("def")); + rlist.add(innerStruct); + + List innerList = new ArrayList(); + innerList.add(314); + innerList.add(007); + rlist.add(innerList); + + Map map = new HashMap(3); + map.put(new Short("2"), "hcat is cool"); + map.put(new Short("3"), "is it?"); + map.put(new Short("4"), "or is it not?"); + rlist.add(map); + + rlist.add(new Boolean(true)); + + List c1 = new ArrayList(); + List c1_1 = new ArrayList(); + c1_1.add(new Integer(12)); + List i2 = new ArrayList(); + List ii1 = new ArrayList(); + ii1.add(new Integer(13)); + ii1.add(new Integer(14)); + i2.add(ii1); + Map> ii2 = new HashMap>(); + List iii1 = new ArrayList(); + iii1.add(new Integer(15)); + ii2.put("phew", iii1); + i2.add(ii2); + c1_1.add(i2); + c1.add(c1_1); + rlist.add(c1); + + String typeString = + "tinyint,smallint,int,bigint,double,float,string,string," + + "struct,array,map,boolean," + + "array,ii2:map>>>>"; + Properties props = new Properties(); + + props.put(Constants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1"); + props.put(Constants.LIST_COLUMN_TYPES, typeString); +// props.put(Constants.SERIALIZATION_NULL_FORMAT, "\\N"); +// props.put(Constants.SERIALIZATION_FORMAT, "1"); + + data.put(props, new DefaultHCatRecord(rlist)); + return data; + } + + public void testRW() throws Exception { + + Configuration conf = new Configuration(); + + for (Entry e : getData().entrySet()){ + Properties tblProps = e.getKey(); + HCatRecord r = e.getValue(); + + HCatRecordSerDe hrsd = new HCatRecordSerDe(); + hrsd.initialize(conf, tblProps); + + JsonSerDe jsde = new JsonSerDe(); + jsde.initialize(conf, tblProps); + + System.out.println("ORIG:"+r.toString()); + + Writable s = hrsd.serialize(r,hrsd.getObjectInspector()); + System.out.println("ONE:"+s.toString()); + + Object o1 = hrsd.deserialize(s); + assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o1)); + + Writable s2 = jsde.serialize(o1, hrsd.getObjectInspector()); + System.out.println("TWO:"+s2.toString()); + Object o2 = jsde.deserialize(s2); + System.out.println("deserialized TWO : "+o2); + + assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2)); + } + + } + +}