Index: src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java =================================================================== --- src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (revision 1240677) +++ src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (working copy) @@ -122,7 +122,7 @@ * @throws HCatException if call made on non-primitive types */ public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatException { - assertTypeInCategory(type,Category.PRIMITIVE); + assertTypeInCategory(type,Category.PRIMITIVE,fieldName); this.fieldName = fieldName; this.type = type; this.category = Category.PRIMITIVE; @@ -162,8 +162,8 @@ * @throws HCatException if call made on non-Map types */ public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException{ - assertTypeInCategory(type,Category.MAP); - assertTypeInCategory(mapKeyType,Category.PRIMITIVE); + assertTypeInCategory(type,Category.MAP, fieldName); + assertTypeInCategory(mapKeyType,Category.PRIMITIVE, fieldName); this.fieldName = fieldName; this.type = Type.MAP; this.category = Category.MAP; @@ -174,29 +174,29 @@ } public HCatSchema getStructSubSchema() throws HCatException { - assertTypeInCategory(this.type,Category.STRUCT); + assertTypeInCategory(this.type,Category.STRUCT, this.fieldName); return subSchema; } public HCatSchema getArrayElementSchema() throws HCatException { - assertTypeInCategory(this.type,Category.ARRAY); + assertTypeInCategory(this.type,Category.ARRAY, this.fieldName); return subSchema; } public Type getMapKeyType() throws HCatException { - assertTypeInCategory(this.type,Category.MAP); + assertTypeInCategory(this.type,Category.MAP, this.fieldName); return mapKeyType; } public HCatSchema getMapValueSchema() throws HCatException { - assertTypeInCategory(this.type,Category.MAP); + assertTypeInCategory(this.type,Category.MAP, this.fieldName); return subSchema; } - private static void assertTypeInCategory(Type type, Category category) throws HCatException { + private static void assertTypeInCategory(Type type, Category category, String fieldName) throws HCatException { Category typeCategory = Category.fromType(type); if (typeCategory != category){ - throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory); + throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory+ " (field "+fieldName+")"); } } Index: src/java/org/apache/hcatalog/pig/HCatBaseStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (revision 1240677) +++ src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (working copy) @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; @@ -81,7 +82,7 @@ for(String partKVP : partKVPs){ String[] partKV = partKVP.split("="); if(partKV.length == 2) { - String partKey = partKV[0].trim(); + String partKey = partKV[0].trim(); partitionKeys.add(partKey); partitions.put(partKey, partKV[1].trim()); } else { @@ -118,58 +119,33 @@ * schema of the table in metastore. */ protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{ - List fieldSchemas = new ArrayList(pigSchema.size()); for(FieldSchema fSchema : pigSchema.getFields()){ - byte type = fSchema.type; - HCatFieldSchema hcatFSchema; - try { + HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema); - // Find out if we need to throw away the tuple or not. - if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){ - List arrFields = new ArrayList(1); - arrFields.add(getHCatFSFromPigFS(fSchema.schema.getField(0).schema.getField(0), tableSchema)); - hcatFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null); - } - else{ - hcatFSchema = getHCatFSFromPigFS(fSchema, tableSchema); - } - fieldSchemas.add(hcatFSchema); + fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema)); } catch (HCatException he){ throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he); } } - return new HCatSchema(fieldSchemas); } - private void validateUnNested(Schema innerSchema) throws FrontendException{ - - for(FieldSchema innerField : innerSchema.getFields()){ - validateAlias(innerField.alias); - if(DataType.isComplex(innerField.type)) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } + public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException{ + if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) { + return true; } - } - - private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{ - - String colName = bagFieldSchema.alias; - for(HCatFieldSchema field : tableSchema.getFields()){ - if(colName.equalsIgnoreCase(field.getName())){ - return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true; - } - } // Column was not found in table schema. Its a new column List tupSchema = bagFieldSchema.schema.getFields(); - return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false; + if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) { + return true; + } + return false; } - private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatSchema hcatTblSchema) throws FrontendException, HCatException{ - + private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException{ byte type = fSchema.type; switch(type){ @@ -191,19 +167,29 @@ case DataType.BYTEARRAY: return new HCatFieldSchema(fSchema.alias, Type.BINARY, null); - + case DataType.BAG: Schema bagSchema = fSchema.schema; List arrFields = new ArrayList(1); - arrFields.add(getHCatFSFromPigFS(bagSchema.getField(0), hcatTblSchema)); + FieldSchema field; + // Find out if we need to throw away the tuple or not. + if (removeTupleFromBag(hcatFieldSchema, fSchema)) { + field = bagSchema.getField(0).schema.getField(0); + } else { + field = bagSchema.getField(0); + } + arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0))); return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), ""); case DataType.TUPLE: List fieldNames = new ArrayList(); List hcatFSs = new ArrayList(); - for( FieldSchema fieldSchema : fSchema.schema.getFields()){ - fieldNames.add( fieldSchema.alias); - hcatFSs.add(getHCatFSFromPigFS(fieldSchema, hcatTblSchema)); + HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema(); + List fields = fSchema.schema.getFields(); + for (int i = 0; i < fields.size(); i++) { + FieldSchema fieldSchema = fields.get(i); + fieldNames.add(fieldSchema.alias); + hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i))); } return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), ""); @@ -211,27 +197,12 @@ // Pig's schema contain no type information about map's keys and // values. So, if its a new column assume if its existing // return whatever is contained in the existing column. - HCatFieldSchema mapField = getTableCol(fSchema.alias, hcatTblSchema); + HCatFieldSchema valFS; List valFSList = new ArrayList(1); - if(mapField != null){ - Type mapValType = mapField.getMapValueSchema().get(0).getType(); - - switch(mapValType){ - case STRING: - case BIGINT: - case INT: - case FLOAT: - case DOUBLE: - case BINARY: - valFS = new HCatFieldSchema(fSchema.alias, mapValType, null); - break; - default: - throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE); - } - valFSList.add(valFS); - return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),""); + if(hcatFieldSchema != null){ + return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), ""); } // Column not found in target table. Its a new column. Its schema is map @@ -267,55 +238,83 @@ } } - private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws ExecException, HCatException{ + private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException{ + try { - // The real work-horse. Spend time and energy in this method if there is - // need to keep HCatStorer lean and go fast. - Type type = hcatFS.getType(); + // The real work-horse. Spend time and energy in this method if there is + // need to keep HCatStorer lean and go fast. + Type type = hcatFS.getType(); + switch(type){ - switch(type){ + case BINARY: + ByteArrayRef ba = new ByteArrayRef(); + byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); + ba.setData(bytes); + return ba; - case BINARY: - ByteArrayRef ba = new ByteArrayRef(); - byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); - ba.setData(bytes); - return ba; - - case STRUCT: - // Unwrap the tuple. - return ((Tuple)pigObj).getAll(); - // Tuple innerTup = (Tuple)pigObj; - // - // List innerList = new ArrayList(innerTup.size()); - // int i = 0; - // for(HCatTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){ - // innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo)); - // } - // return innerList; - case ARRAY: - // Unwrap the bag. - DataBag pigBag = (DataBag)pigObj; - HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0); - boolean needTuple = tupFS.getType() == Type.STRUCT; - List bagContents = new ArrayList((int)pigBag.size()); - Iterator bagItr = pigBag.iterator(); + case STRUCT: + if (pigObj == null) { + return null; + } + HCatSchema structSubSchema = hcatFS.getStructSubSchema(); + // Unwrap the tuple. + List all = ((Tuple)pigObj).getAll(); + ArrayList converted = new ArrayList(all.size()); + for (int i = 0; i < all.size(); i++) { + converted.add(getJavaObj(all.get(i), structSubSchema.get(i))); + } + return converted; - while(bagItr.hasNext()){ - // If there is only one element in tuple contained in bag, we throw away the tuple. - bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0)); + case ARRAY: + if (pigObj == null) { + return null; + } + // Unwrap the bag. + DataBag pigBag = (DataBag)pigObj; + HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0); + boolean needTuple = tupFS.getType() == Type.STRUCT; + List bagContents = new ArrayList((int)pigBag.size()); + Iterator bagItr = pigBag.iterator(); - } - return bagContents; + while(bagItr.hasNext()){ + // If there is only one element in tuple contained in bag, we throw away the tuple. + bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS)); - // case MAP: - // Map pigMap = (Map)pigObj; - // Map typeMap = new HashMap(); - // for(Entry entry: pigMap.entrySet()){ - // typeMap.put(entry.getKey(), new Long(entry.getValue().toString())); - // } - // return typeMap; - default: - return pigObj; + } + return bagContents; + case MAP: + if (pigObj == null) { + return null; + } + Map pigMap = (Map)pigObj; + Map typeMap = new HashMap(); + for(Entry entry: pigMap.entrySet()){ + // the value has a schema and not a FieldSchema + typeMap.put( + // Schema validation enforces that the Key is a String + (String)entry.getKey(), + getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0))); + } + return typeMap; + case STRING: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + return pigObj; + case SMALLINT: + case TINYINT: + case BOOLEAN: + // would not pass schema validation anyway + throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE); + default: + throw new BackendException("Unexpected type "+type+" for value "+pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE); + } + } catch (BackendException e) { + // provide the path to the field in the error message + throw new BackendException( + (hcatFS.getName() == null ? " " : hcatFS.getName()+".") + e.getMessage(), + e.getCause() == null ? e : e.getCause()); } } @@ -339,84 +338,51 @@ // dictated by semantics, consult HCatSchema of table when need be. for(FieldSchema pigField : pigSchema.getFields()){ - byte type = pigField.type; - String alias = pigField.alias; - validateAlias(alias); - HCatFieldSchema hcatField = getTableCol(alias, tblSchema); + HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema); - if(DataType.isComplex(type)){ - switch(type){ + validateSchema(pigField, hcatField); + } - case DataType.MAP: - if(hcatField != null){ - if(hcatField.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(hcatField.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex" + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - break; + try { + PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema); + } catch (IOException e) { + throw new FrontendException("HCatalog schema is not compatible with Pig: "+e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e); + } + } - case DataType.BAG: - // Only map is allowed as complex type in tuples inside bag. - for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ - if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - validateAlias(innerField.alias); - } - if(hcatField != null){ - // Do the same validation for HCatSchema. - HCatFieldSchema arrayFieldScehma = hcatField.getArrayElementSchema().get(0); - Type hType = arrayFieldScehma.getType(); - if(hType == Type.STRUCT){ - for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){ - if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){ - throw new FrontendException("Nested Complex types not allowed "+ hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - if(hType == Type.MAP){ - if(arrayFieldScehma.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - if(hType == Type.ARRAY) { - throw new FrontendException("Arrays cannot contain array within it. "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - break; - case DataType.TUPLE: - validateUnNested(pigField.schema); - if(hcatField != null){ - for(HCatFieldSchema structFieldSchema : hcatField.getStructSubSchema().getFields()){ - if(structFieldSchema.isComplex()){ - throw new FrontendException("Nested Complex types are not allowed."+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } + private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField) + throws HCatException, FrontendException { + validateAlias(pigField.alias); + byte type = pigField.type; + if(DataType.isComplex(type)){ + switch(type){ + + case DataType.MAP: + if(hcatField != null){ + if(hcatField.getMapKeyType() != Type.STRING){ + throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); } - break; + // Map values can be primitive or complex + } + break; - default: - throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); + case DataType.BAG: + HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema(); + for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ + validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema)); } - } - } + break; - for(HCatFieldSchema hcatField : tblSchema.getFields()){ + case DataType.TUPLE: + HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema(); + for(FieldSchema innerField : pigField.schema.getFields()){ + validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema)); + } + break; - // We dont do type promotion/demotion. - Type hType = hcatField.getType(); - switch(hType){ - case SMALLINT: - case TINYINT: - case BOOLEAN: - throw new FrontendException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); + default: + throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); } } } @@ -431,11 +397,12 @@ } // Finds column by name in HCatSchema, if not found returns null. - private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){ - - for(HCatFieldSchema hcatField : tblSchema.getFields()){ - if(hcatField.getName().equalsIgnoreCase(alias)){ - return hcatField; + private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema){ + if (tblSchema != null) { + for(HCatFieldSchema hcatField : tblSchema.getFields()){ + if(hcatField!=null && hcatField.getName()!= null && hcatField.getName().equalsIgnoreCase(alias)){ + return hcatField; + } } } // Its a new column Index: src/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1240677) +++ src/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -20,9 +20,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -32,19 +34,18 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatArrayBag; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; +import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; -import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -92,7 +93,7 @@ static HiveMetaStoreClient client = null; private static HiveMetaStoreClient createHiveMetaClient(String serverUri, - String serverKerberosPrincipal, Class clazz) throws Exception { + String serverKerberosPrincipal, Class clazz) throws Exception { if (client != null){ return client; } @@ -102,12 +103,12 @@ hiveConf.set("hive.metastore.local", "false"); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim()); } - + if (serverKerberosPrincipal != null){ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); - hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); } - + try { client = new HiveMetaStoreClient(hiveConf,null); } catch (Exception e){ @@ -209,6 +210,11 @@ HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0); if(arrayElementFieldSchema.getType() == Type.STRUCT) { bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema)); + } else if(arrayElementFieldSchema.getType() == Type.ARRAY) { + ResourceSchema s = new ResourceSchema(); + List lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema)); + s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); + bagSubFieldSchemas[0].setSchema(s); } else { ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1]; innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName("innerfield") @@ -217,7 +223,8 @@ .setSchema(null); // the element type is not a tuple - so no subschema bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas)); } - return new ResourceSchema().setFields(bagSubFieldSchemas); + ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas); + return s; } @@ -279,7 +286,7 @@ if (type == Type.BINARY){ return DataType.BYTEARRAY; } - + if (type == Type.BOOLEAN){ errMsg = "HCatalog column type 'BOOLEAN' is not supported in " + "Pig as a column type"; @@ -291,27 +298,34 @@ } public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception { - if (hr == null){ - return null; - } - return transformToTuple(hr.getAll(),hs); + if (hr == null){ + return null; } + return transformToTuple(hr.getAll(),hs); + } @SuppressWarnings("unchecked") public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception { - Type itemType = hfs.getType(); - switch (itemType){ - case BINARY: - return new DataByteArray(((ByteArrayRef)o).getData()); - case STRUCT: - return transformToTuple((List)o,hfs); - case ARRAY: - return transformToBag((List) o,hfs); - case MAP: - return transformToPigMap((Map)o,hfs); - default: - return o; - } + Object result; + Type itemType = hfs.getType(); + switch (itemType){ + case BINARY: + result = new DataByteArray(((ByteArrayRef)o).getData()); + break; + case STRUCT: + result = transformToTuple((List)o,hfs); + break; + case ARRAY: + result = transformToBag((List) o,hfs); + break; + case MAP: + result = transformToPigMap((Map)o,hfs); + break; + default: + result = o; + break; + } + return result; } public static Tuple transformToTuple(List objList, HCatFieldSchema hfs) throws Exception { @@ -319,7 +333,7 @@ return transformToTuple(objList,hfs.getStructSubSchema()); } catch (Exception e){ if (hfs.getType() != Type.STRUCT){ - throw new Exception("Expected Struct type, got "+hfs.getType()); + throw new Exception("Expected Struct type, got "+hfs.getType(), e); } else { throw e; } @@ -327,21 +341,29 @@ } public static Tuple transformToTuple(List objList, HCatSchema hs) throws Exception { - if (objList == null){ - return null; - } - Tuple t = tupFac.newTuple(objList.size()); - List subFields = hs.getFields(); - for (int i = 0; i < subFields.size(); i++){ - t.set(i,extractPigObject(objList.get(i), subFields.get(i))); - } - return t; + if (objList == null){ + return null; + } + Tuple t = tupFac.newTuple(objList.size()); + List subFields = hs.getFields(); + for (int i = 0; i < subFields.size(); i++){ + t.set(i,extractPigObject(objList.get(i), subFields.get(i))); + } + return t; } public static Map transformToPigMap(Map map, HCatFieldSchema hfs) throws Exception { - return map; + if (map == null) { + return null; } + Map result = new HashMap(); + for (Entry entry : map.entrySet()) { + result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0))); + } + return result; + } + @SuppressWarnings("unchecked") public static DataBag transformToBag(List list, HCatFieldSchema hfs) throws Exception { if (list == null){ @@ -349,72 +371,57 @@ } HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0); - if (elementSubFieldSchema.getType() == Type.STRUCT){ - DataBag db = new DefaultDataBag(); - for (Object o : list){ - db.add(transformToTuple((List)o,elementSubFieldSchema)); + DataBag db = new DefaultDataBag(); + for (Object o : list){ + Tuple tuple; + if (elementSubFieldSchema.getType() == Type.STRUCT){ + tuple = transformToTuple((List)o, elementSubFieldSchema); + } else { + // bags always contain tuples + tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema)); } - return db; - } else { - return new HCatArrayBag(list); + db.add(tuple); } + return db; } - public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { - for (HCatFieldSchema hfs : hcatTableSchema.getFields()){ - Type htype = hfs.getType(); - if (htype == Type.ARRAY){ - validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(hfs); - }else if (htype == Type.STRUCT){ - validateIsPigCompatibleStructWithPrimitives(hfs); - }else if (htype == Type.MAP){ - validateIsPigCompatibleMapWithPrimitives(hfs); - }else { - validateIsPigCompatiblePrimitive(hfs); - } - } + private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException { + for(HCatFieldSchema hcatField : tblSchema.getFields()){ + validateHcatFieldFollowsPigRules(hcatField); + } } - private static void validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes( - HCatFieldSchema hfs) throws IOException { - HCatFieldSchema subFieldSchema = hfs.getArrayElementSchema().getFields().get(0); - if (subFieldSchema.getType() == Type.STRUCT){ - validateIsPigCompatibleStructWithPrimitives(subFieldSchema); - }else if (subFieldSchema.getType() == Type.MAP) { - validateIsPigCompatiblePrimitive(subFieldSchema.getMapValueSchema().getFields().get(0)); - }else { - validateIsPigCompatiblePrimitive(subFieldSchema); + private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException { + try { + Type hType = hcatField.getType(); + switch(hType){ + // We don't do type promotion/demotion. + case SMALLINT: + case TINYINT: + case BOOLEAN: + throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); + case ARRAY: + validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema()); + break; + case STRUCT: + validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema()); + break; + case MAP: + // key is only string + validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema()); + break; } + } catch (HCatException e) { + throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e); + } } - private static void validateIsPigCompatibleMapWithPrimitives(HCatFieldSchema hfs) throws IOException{ - if (hfs.getMapKeyType() != Type.STRING){ - throw new PigException("Incompatible type in schema, found map with " + - "non-string key type in :"+hfs.getTypeString(), PIG_EXCEPTION_CODE); - } - validateIsPigCompatiblePrimitive(hfs.getMapValueSchema().getFields().get(0)); - } - private static void validateIsPigCompatibleStructWithPrimitives(HCatFieldSchema hfs) throws IOException { - for ( HCatFieldSchema subField : hfs.getStructSubSchema().getFields()){ - validateIsPigCompatiblePrimitive(subField); - } + public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { + validateHCatSchemaFollowsPigRules(hcatTableSchema); } - private static void validateIsPigCompatiblePrimitive(HCatFieldSchema hfs) throws IOException { - Type htype = hfs.getType(); - if ( - (hfs.isComplex()) || - (htype == Type.TINYINT) || - (htype == Type.SMALLINT) - ){ - throw new PigException("Incompatible type in schema, expected pig " + - "compatible primitive for:" + hfs.getTypeString()); - } - - } - public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { if(p.getProperty(propName) != null){ config.set(propName, p.getProperty(propName)); Index: src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java =================================================================== --- src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java (revision 1240677) +++ src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java (working copy) @@ -156,6 +156,10 @@ // with crisp Java objects inside it. We have to do it because higher layer // may not know how to do it. + if (data == null) { + return null; + } + switch(oi.getCategory()){ case PRIMITIVE: Index: src/test/org/apache/hcatalog/pig/MockLoader.java =================================================================== --- src/test/org/apache/hcatalog/pig/MockLoader.java (revision 0) +++ src/test/org/apache/hcatalog/pig/MockLoader.java (revision 0) @@ -0,0 +1,159 @@ +package org.apache.hcatalog.pig; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; + +public class MockLoader extends LoadFunc { + private static final class MockRecordReader extends RecordReader { + @Override + public void close() throws IOException { + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return "mockKey"; + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + return "mockValue"; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0.5f; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, + InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return true; + } + } + + private static final class MockInputSplit extends InputSplit implements Writable { + private String location; + public MockInputSplit() { + } + public MockInputSplit(String location) { + this.location = location; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[] { location }; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 10000000; + } + + @Override + public boolean equals(Object arg0) { + return arg0==this; + } + + @Override + public int hashCode() { + return location.hashCode(); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + location = arg0.readUTF(); + } + + @Override + public void write(DataOutput arg0) throws IOException { + arg0.writeUTF(location); + } + } + + private static final class MockInputFormat extends InputFormat { + + private final String location; + + public MockInputFormat(String location) { + this.location = location; + } + + @Override + public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new MockRecordReader(); + } + + @Override + public List getSplits(JobContext arg0) throws IOException, InterruptedException { + return Arrays.asList(new MockInputSplit(location)); + } + } + + private static final Map> locationToData = new HashMap>(); + + public static void setData(String location, Iterable data) { + locationToData.put(location, data); + } + + private String location; + + private Iterator data; + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + return location; + } + + @Override + public void setLocation(String location, Job job) throws IOException { + this.location = location; + if (location == null) { + throw new IOException("null location passed to MockLoader"); + } + this.data = locationToData.get(location).iterator(); + if (this.data == null) { + throw new IOException("No data configured for location: "+location); + } + } + + @Override + public Tuple getNext() throws IOException { + if (data == null) { + throw new IOException("data was not correctly initialized in MockLoader"); + } + return data.hasNext() ? data.next() : null; + } + + @Override + public InputFormat getInputFormat() throws IOException { + return new MockInputFormat(location); + } + + @Override + public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException { + } + +} \ No newline at end of file Index: src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 0) +++ src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 0) @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.MiniCluster; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHCatLoaderComplexSchema { + + private static MiniCluster cluster = MiniCluster.buildCluster(); + private static Driver driver; + private static Properties props; + + private void dropTable(String tablename) throws IOException, CommandNeedRetryException{ + driver.run("drop table "+tablename); + } + + private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException{ + String createTable; + createTable = "create table "+tablename+"("+schema+") "; + if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){ + createTable = createTable + "partitioned by ("+partitionedBy+") "; + } + createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + System.out.println("Creating table:\n"+createTable); + CommandProcessorResponse result = driver.run(createTable); + int retCode = result.getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+" "+result.getErrorMessage()+"]"); + } + } + + private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException{ + createTable(tablename,schema,null); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class ); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + + } + + private static final TupleFactory tf = TupleFactory.getInstance(); + private static final BagFactory bf = BagFactory.getInstance(); + private Tuple t(Object... objects) { + return tf.newTuple(Arrays.asList(objects)); + } + private DataBag b(Tuple... objects) { + return bf.newDefaultBag(Arrays.asList(objects)); + } + + /** + * artificially complex nested schema to test nested schema conversion + * @throws Exception + */ + @Test + public void testSyntheticComplexSchema() throws Exception { + String pigSchema = + "(" + + "a: " + + "(" + + "aa: chararray, " + + "ab: long, " + + "ac: map[], " + + "ad: { t: (ada: long) }, " + + "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + + "af: (afa: chararray, afb: long) " + + ")," + + "b: chararray, " + + "c: long, " + + "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + + ")"; + + // with extra structs + String tableSchema = + "a struct<" + + "aa: string, " + + "ab: bigint, " + + "ac: map, " + + "ad: array>, " + + "ae: array>>," + + "af: struct " + + ">, " + + "b string, " + + "c bigint, " + + "d array, dc: array>>>"; + + // without extra structs + String tableSchema2 = + "a struct<" + + "aa: string, " + + "ab: bigint, " + + "ac: map, " + + "ad: array, " + + "ae: array>>," + + "af: struct " + + ">, " + + "b string, " + + "c bigint, " + + "d array, dc: array>>"; + + List data = new ArrayList(); + for (int i = 0; i < 10; i++) { + Tuple t = t( + t( + "aa test", + 2l, + new HashMap() {{put("ac test1", "test 1");put("ac test2", "test 2");}}, + b(t(3l), t(4l)), + b(t(5l, t("aeba test", 6l))), + t("afa test", 7l) + ), + "b test", + (long)i, + b(t(8l, t("dba test", 9l), b(t(10l))))); + + data.add(t); + } + verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true); + verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false); + verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true); + verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false); + + } + + private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List data, boolean provideSchemaToStorer) + throws IOException, CommandNeedRetryException, ExecException, FrontendException { + MockLoader.setData(tablename+"Input", data); + + try { + createTable(tablename, tableSchema); + PigServer server = new PigServer(ExecType.LOCAL, props); + server.setBatchOn(); + server.registerQuery("A = load '"+tablename+"Input' using org.apache.hcatalog.pig.MockLoader() AS "+pigSchema+";"); + Schema dumpedASchema = server.dumpSchema("A"); + server.registerQuery("STORE A into '"+tablename+"' using org.apache.hcatalog.pig.HCatStorer(" + + (provideSchemaToStorer ? "'', '"+pigSchema+"'" : "") + + ");"); + ExecJob execJob = server.executeBatch().get(0); + if (!execJob.getStatistics().isSuccessful()) { + throw new RuntimeException("Import failed", execJob.getException()); + } + + // test that schema was loaded correctly + server.registerQuery("X = load '"+tablename+"' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator it = server.openIterator("X"); + int i = 0; + while (it.hasNext()) { + Tuple input = data.get(i++); + Tuple output = it.next(); + Assert.assertEquals(input.toString(), output.toString()); + System.out.println(output); + } + Schema dumpedXSchema = server.dumpSchema("X"); + + Assert.assertEquals( + "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)", + "", + compareIgnoreFiledNames(dumpedASchema, dumpedXSchema)); + + } finally { + dropTable(tablename); + } + } + + private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException { + if (expected == null || got == null) { + if (expected == got) { + return ""; + } else { + return "\nexpected "+expected+" got "+got; + } + } + if (expected.size() != got.size()) { + return "\nsize expected "+expected.size()+" ("+expected+") got "+got.size()+" ("+got+")"; + } + String message = ""; + for (int i = 0; i < expected.size(); i++) { + FieldSchema expectedField = expected.getField(i); + FieldSchema gotField = got.getField(i); + if (expectedField.type != gotField.type) { + message += "\ntype expected "+expectedField.type+" ("+expectedField+") got "+gotField.type+" ("+gotField+")"; + } else { + message += compareIgnoreFiledNames(expectedField.schema, gotField.schema); + } + } + return message; + } + + /** + * tests that unnecessary tuples are drop while converting schema + * (Pig requires Tuples in Bags) + * @throws Exception + */ + @Test + public void testTupleInBagInTupleInBag() throws Exception { + String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })"; + + String tableSchema = "a array< array< bigint > >"; + + List data = new ArrayList(); + data.add(t(b(t(b(t(100l),t(101l))), t(b(t(110l)))))); + data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l)))))); + data.add(t(b(t(b(t(300l),t(301l)))))); + data.add(t(b(t(b(t(400l))), t(b(t(410l),t(411l),t(412l)))))); + + + verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true); + verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false); + + // test that we don't drop the unnecessary tuple if the table has the corresponding Struct + String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >"; + + verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true); + verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false); + + } + + @Test + public void testMapWithComplexData() throws Exception { + String pigSchema = "(a: long, b: map[])"; + String tableSchema = "a bigint, b map>"; + + List data = new ArrayList(); + for (int i = 0; i < 10; i++) { + Tuple t = t( + (long)i, + new HashMap() {{put("b test 1", t(1l,"test 1"));put("b test 2", t(2l, "test 2"));}}); + + data.add(t); + } + verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true); + verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false); + + } + } Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java =================================================================== --- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 1240677) +++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (working copy) @@ -48,7 +48,7 @@ Class.forName(shimFQN).asSubclass(HCatHadoopShims.class); return clasz.newInstance(); } catch (Exception e) { - throw new RuntimeException("Failed to instantiate: " + shimFQN); + throw new RuntimeException("Failed to instantiate: " + shimFQN, e); } } } Index: src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java =================================================================== --- src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (revision 1240677) +++ src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hcatalog.data.schema; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -35,26 +36,24 @@ public class HCatSchemaUtils { - private static HCatSchemaUtils ref = new HCatSchemaUtils(); - public static CollectionBuilder getStructSchemaBuilder(){ - return ref.new CollectionBuilder(); + return new CollectionBuilder(); } public static CollectionBuilder getListSchemaBuilder(){ - return ref.new CollectionBuilder(); + return new CollectionBuilder(); } public static MapBuilder getMapSchemaBuilder(){ - return ref.new MapBuilder(); + return new MapBuilder(); } - public abstract class HCatSchemaBuilder { + public static abstract class HCatSchemaBuilder { public abstract HCatSchema build() throws HCatException; } - public class CollectionBuilder extends HCatSchemaBuilder { // for STRUCTS(multiple-add-calls) and LISTS(single-add-call) + public static class CollectionBuilder extends HCatSchemaBuilder { // for STRUCTS(multiple-add-calls) and LISTS(single-add-call) List fieldSchemas = null; CollectionBuilder(){ @@ -77,7 +76,7 @@ } - public class MapBuilder extends HCatSchemaBuilder { + public static class MapBuilder extends HCatSchemaBuilder { Type keyType = null; HCatSchema valueSchema = null; @@ -116,21 +115,23 @@ private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fieldTypeInfo) throws HCatException { Category typeCategory = fieldTypeInfo.getCategory(); + HCatFieldSchema hCatFieldSchema; if (Category.PRIMITIVE == typeCategory){ - return new HCatFieldSchema(fieldName,getPrimitiveHType(fieldTypeInfo),null); + hCatFieldSchema = new HCatFieldSchema(fieldName,getPrimitiveHType(fieldTypeInfo),null); } else if (Category.STRUCT == typeCategory) { HCatSchema subSchema = constructHCatSchema((StructTypeInfo)fieldTypeInfo); - return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.STRUCT,subSchema,null); + hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.STRUCT,subSchema,null); } else if (Category.LIST == typeCategory) { HCatSchema subSchema = getHCatSchema(((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo()); - return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.ARRAY,subSchema,null); + hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.ARRAY,subSchema,null); } else if (Category.MAP == typeCategory) { HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo()); HCatSchema subSchema = getHCatSchema(((MapTypeInfo)fieldTypeInfo).getMapValueTypeInfo()); - return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.MAP,mapKeyType,subSchema,null); + hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.MAP,mapKeyType,subSchema,null); } else{ throw new TypeNotPresentException(fieldTypeInfo.getTypeName(),null); } + return hCatFieldSchema; } private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { @@ -180,23 +181,25 @@ public static HCatSchema getHCatSchema(TypeInfo typeInfo) throws HCatException { Category typeCategory = typeInfo.getCategory(); + HCatSchema hCatSchema; if (Category.PRIMITIVE == typeCategory){ - return getStructSchemaBuilder().addField(new HCatFieldSchema(null,getPrimitiveHType(typeInfo),null)).build(); + hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,getPrimitiveHType(typeInfo),null)).build(); } else if (Category.STRUCT == typeCategory) { HCatSchema subSchema = constructHCatSchema((StructTypeInfo) typeInfo); - return getStructSchemaBuilder().addField(new HCatFieldSchema(null,Type.STRUCT,subSchema,null)).build(); + hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,Type.STRUCT,subSchema,null)).build(); } else if (Category.LIST == typeCategory) { - CollectionBuilder builder = getStructSchemaBuilder(); + CollectionBuilder builder = getListSchemaBuilder(); builder.addField(getHCatFieldSchema(null,((ListTypeInfo)typeInfo).getListElementTypeInfo())); - return builder.build(); + hCatSchema = new HCatSchema(Arrays.asList(new HCatFieldSchema("",Type.ARRAY, builder.build(), ""))); } else if (Category.MAP == typeCategory) { HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo)typeInfo).getMapKeyTypeInfo()); HCatSchema subSchema = getHCatSchema(((MapTypeInfo)typeInfo).getMapValueTypeInfo()); MapBuilder builder = getMapSchemaBuilder(); - return builder.withKeyType(mapKeyType).withValueSchema(subSchema).build(); + hCatSchema = builder.withKeyType(mapKeyType).withValueSchema(subSchema).build(); } else{ throw new TypeNotPresentException(typeInfo.getTypeName(),null); } + return hCatSchema; } public static HCatSchema getHCatSchemaFromTypeString(String typeString) throws HCatException { Index: src/java/org/apache/hcatalog/data/HCatArrayBag.java =================================================================== --- src/java/org/apache/hcatalog/data/HCatArrayBag.java (revision 1240677) +++ src/java/org/apache/hcatalog/data/HCatArrayBag.java (working copy) @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.data; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DefaultBagFactory; -import org.apache.pig.data.DefaultDataBag; -import org.apache.pig.data.DefaultTuple; -import org.apache.pig.data.Tuple; - -public class HCatArrayBag implements DataBag { - - private static final long DUMMY_SIZE = 40; - List rawItemList = null; - DataBag convertedBag = null; -// List tupleList = null; - - public class HCatArrayBagIterator implements Iterator { - - Iterator iter = null; - - public HCatArrayBagIterator(List rawItemList) { - iter = rawItemList.iterator(); - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public Tuple next() { - Tuple t = new DefaultTuple(); - t.append(iter.next()); - return t; - } - - @Override - public void remove() { - iter.remove(); - } - - } - - public HCatArrayBag(List list) { - rawItemList = list; - } - - private void convertFromRawToTupleForm(){ - if (convertedBag == null){ - List ltuples = new ArrayList(); - for (T item : rawItemList){ - Tuple t = new DefaultTuple(); - t.append(item); - ltuples.add(t); - } - convertedBag = DefaultBagFactory.getInstance().newDefaultBag(ltuples); - }else{ - // TODO : throw exception or be silent? Currently going with silence, but needs revisiting. - } - } - - @Override - public void add(Tuple t) { - if (convertedBag == null){ - convertFromRawToTupleForm(); - } - convertedBag.add(t); - } - - @Override - public void addAll(DataBag db) { - Tuple t; - for (Iterator dbi = db.iterator() ; dbi.hasNext();){ - this.add(dbi.next()); - } - } - - @Override - public void clear() { - rawItemList = null; - if (convertedBag != null){ - convertedBag.clear(); - convertedBag = null; - } - } - - @Override - public boolean isDistinct() { - return false; - } - - @Override - public boolean isSorted() { - return false; - } - - @Override - public Iterator iterator() { - if (convertedBag != null){ - return convertedBag.iterator(); - }else{ - return new HCatArrayBagIterator(rawItemList); - } - } - - @Override - public void markStale(boolean arg0) { - // TODO Auto-generated method stub - - } - - @Override - public long size() { - return (convertedBag == null ? (rawItemList == null ? 0 : rawItemList.size()) : convertedBag.size() ); - } - - @Override - public long getMemorySize() { - // FIXME: put in actual impl - if (convertedBag != null){ - return convertedBag.getMemorySize() + DUMMY_SIZE; - }else { - return DUMMY_SIZE; - } - } - - @Override - public long spill() { - // FIXME: put in actual spill impl even for the list case - if (convertedBag != null){ - return convertedBag.spill(); - } - return 0; - } - - @Override - public void readFields(DataInput arg0) throws IOException { - convertedBag = new DefaultDataBag(); - convertedBag.readFields(arg0); - } - - @Override - public void write(DataOutput arg0) throws IOException { - convertFromRawToTupleForm(); - convertedBag.write(arg0); - } - - @Override - public int compareTo(Object arg0) { - // TODO Auto-generated method stub - really need to put in a better implementation here, also, equality case not considered yet - return arg0.hashCode() < this.hashCode() ? -1 : 1; - } - -} Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1240677) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -83,12 +83,14 @@ partitions, PigHCatUtil.getHCatServerUri(job), PigHCatUtil.getHCatServerPrincipal(job)); - } else { + } else if(userStr.length == 1) { outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions, PigHCatUtil.getHCatServerUri(job), PigHCatUtil.getHCatServerPrincipal(job)); + } else { + throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); } @@ -119,24 +121,24 @@ computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema); HCatOutputFormat.setSchema(job, computedSchema); p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); - + p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); }else{ config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF); PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID); PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); - + } }