Index: src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 0) +++ src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 0) @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.MiniCluster; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHCatLoaderComplexSchema { + + private static MiniCluster cluster = MiniCluster.buildCluster(); + private static Driver driver; + private static Properties props; + + private void dropTable(String tablename) throws IOException, CommandNeedRetryException{ + driver.run("drop table "+tablename); + } + private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException{ + String createTable; + createTable = "create table "+tablename+"("+schema+") "; + if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){ + createTable = createTable + "partitioned by ("+partitionedBy+") "; + } + createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + System.out.println("Creating table:\n"+createTable); + CommandProcessorResponse result = driver.run(createTable); + int retCode = result.getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+" "+result.getErrorMessage()+"]"); + } + } + + private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException{ + createTable(tablename,schema,null); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class ); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + + } + + private static final TupleFactory tf = TupleFactory.getInstance(); + private static final BagFactory bf = BagFactory.getInstance(); + private Tuple t(Object... objects) { + return tf.newTuple(Arrays.asList(objects)); + } + private DataBag b(Tuple... objects) { + return bf.newDefaultBag(Arrays.asList(objects)); + } + + @Test + public void testSyntheticComplexSchema() throws Exception { + String pigSchema = + "(" + + "a: " + + "(" + + "aa: chararray, " + + "ab: long, " + + "ac: map[], " + + "ad: { t: (ada: long) }, " + + "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + + "af: (afa: chararray, afb: long) " + + ")," + + "b: chararray, " + + "c: long, " + + "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + + ")"; + + String tableSchema = + "a struct<" + + "aa: string, " + + "ab: bigint, " + + "ac: map, " + + "ad: array>, " + + "ae: array>>," + + "af: struct " + + ">, " + + "b string, " + + "c bigint, " + + "d array, dc: array>>>"; + + List data = new ArrayList(); + for (int i = 0; i < 10; i++) { + Tuple t = t( + t( + "aa test", + 2l, + new HashMap() {{put("ac test1", "test 1");put("ac test2", "test 2");}}, + b(t(3l), t(4l)), + b(t(5l, t("aeba test", 6l))), + t("afa test", 7l) + ), + "b test", + (long)i, + b(t(8l, t("dba test", 9l), b(t(10l))))); + + data.add(t); + } + MockLoader.setData("mockData", data); + + String tablename = "testSyntheticComplexSchema"; + try { + createTable(tablename, tableSchema); + PigServer server = new PigServer(ExecType.LOCAL, props); + server.setBatchOn(); + server.registerQuery("A = load 'mockData' using org.apache.hcatalog.pig.MockLoader() AS "+pigSchema+";"); + Schema dumpedASchema = server.dumpSchema("A"); + server.registerQuery("STORE A into '"+tablename+"' using org.apache.hcatalog.pig.HCatStorer('', '"+pigSchema+"');"); + ExecJob execJob = server.executeBatch().get(0); + if (!execJob.getStatistics().isSuccessful()) { + throw new RuntimeException("Import failed", execJob.getException()); + } + + // test that schema was loaded correctly + server.registerQuery("X = load '"+tablename+"' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator it = server.openIterator("X"); + int i = 0; + while (it.hasNext()) { + Tuple input = data.get(i++); + Tuple output = it.next(); + Assert.assertEquals(input, output); + System.out.println(output); + } + Schema dumpedXSchema = server.dumpSchema("X"); + Assert.assertEquals(dumpedASchema, dumpedXSchema); + + } finally { + dropTable(tablename); + } + + } + + } Index: src/test/org/apache/hcatalog/pig/MockLoader.java =================================================================== --- src/test/org/apache/hcatalog/pig/MockLoader.java (revision 0) +++ src/test/org/apache/hcatalog/pig/MockLoader.java (revision 0) @@ -0,0 +1,159 @@ +package org.apache.hcatalog.pig; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; + +public class MockLoader extends LoadFunc { + private static final class MockRecordReader extends RecordReader { + @Override + public void close() throws IOException { + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return "mockKey"; + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + return "mockValue"; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0.5f; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, + InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return true; + } + } + + private static final class MockInputSplit extends InputSplit implements Writable { + private String location; + public MockInputSplit() { + } + public MockInputSplit(String location) { + this.location = location; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[] { location }; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 10000000; + } + + @Override + public boolean equals(Object arg0) { + return arg0==this; + } + + @Override + public int hashCode() { + return location.hashCode(); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + location = arg0.readUTF(); + } + + @Override + public void write(DataOutput arg0) throws IOException { + arg0.writeUTF(location); + } + } + + private static final class MockInputFormat extends InputFormat { + + private final String location; + + public MockInputFormat(String location) { + this.location = location; + } + + @Override + public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new MockRecordReader(); + } + + @Override + public List getSplits(JobContext arg0) throws IOException, InterruptedException { + return Arrays.asList(new MockInputSplit(location)); + } + } + + private static final Map> locationToData = new HashMap>(); + + public static void setData(String location, Iterable data) { + locationToData.put(location, data); + } + + private String location; + + private Iterator data; + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + return location; + } + + @Override + public void setLocation(String location, Job job) throws IOException { + this.location = location; + if (location == null) { + throw new IOException("null location passed to MockLoader"); + } + this.data = locationToData.get(location).iterator(); + if (this.data == null) { + throw new IOException("No data configured for location: "+location); + } + } + + @Override + public Tuple getNext() throws IOException { + if (data == null) { + throw new IOException("data was not correctly initialized in MockLoader"); + } + return data.hasNext() ? data.next() : null; + } + + @Override + public InputFormat getInputFormat() throws IOException { + return new MockInputFormat(location); + } + + @Override + public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException { + } + +} \ No newline at end of file Index: src/java/org/apache/hcatalog/pig/HCatBaseStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (revision 1228666) +++ src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (working copy) @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; @@ -81,7 +82,7 @@ for(String partKVP : partKVPs){ String[] partKV = partKVP.split("="); if(partKV.length == 2) { - String partKey = partKV[0].trim(); + String partKey = partKV[0].trim(); partitionKeys.add(partKey); partitions.put(partKey, partKV[1].trim()); } else { @@ -144,16 +145,6 @@ return new HCatSchema(fieldSchemas); } - private void validateUnNested(Schema innerSchema) throws FrontendException{ - - for(FieldSchema innerField : innerSchema.getFields()){ - validateAlias(innerField.alias); - if(DataType.isComplex(innerField.type)) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{ String colName = bagFieldSchema.alias; @@ -191,7 +182,7 @@ case DataType.BYTEARRAY: return new HCatFieldSchema(fSchema.alias, Type.BINARY, null); - + case DataType.BAG: Schema bagSchema = fSchema.schema; List arrFields = new ArrayList(1); @@ -267,31 +258,33 @@ } } - private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws ExecException, HCatException{ - + private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException{ + if (pigObj == null) { + return null; + } // The real work-horse. Spend time and energy in this method if there is // need to keep HCatStorer lean and go fast. Type type = hcatFS.getType(); - + // TODO: we should use lazy wrappers instead to convert only if the fields are accessed. switch(type){ case BINARY: ByteArrayRef ba = new ByteArrayRef(); - byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); + // TODO: check this. It can not be null since I added a null check above + byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); ba.setData(bytes); return ba; - + case STRUCT: + HCatSchema structSubSchema = hcatFS.getStructSubSchema(); // Unwrap the tuple. - return ((Tuple)pigObj).getAll(); - // Tuple innerTup = (Tuple)pigObj; - // - // List innerList = new ArrayList(innerTup.size()); - // int i = 0; - // for(HCatTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){ - // innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo)); - // } - // return innerList; + List all = ((Tuple)pigObj).getAll(); + ArrayList converted = new ArrayList(all.size()); + for (int i = 0; i < all.size(); i++) { + converted.add(getJavaObj(all.get(i), structSubSchema.get(i))); + } + return converted; + case ARRAY: // Unwrap the bag. DataBag pigBag = (DataBag)pigObj; @@ -302,20 +295,37 @@ while(bagItr.hasNext()){ // If there is only one element in tuple contained in bag, we throw away the tuple. - bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0)); + bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS)); } return bagContents; - - // case MAP: - // Map pigMap = (Map)pigObj; - // Map typeMap = new HashMap(); - // for(Entry entry: pigMap.entrySet()){ - // typeMap.put(entry.getKey(), new Long(entry.getValue().toString())); - // } - // return typeMap; - default: + case MAP: + Map pigMap = (Map)pigObj; + Map typeMap = new HashMap(); + for(Entry entry: pigMap.entrySet()){ + // TODO: figure out why the value has a schema and not a FieldSchema!!! + typeMap.put( + // Schema validation enforces that the Key is a String + (String)entry.getKey(), + // Schema validation enforces that the Value is not Complex + getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0))); + } + return typeMap; + case STRING: + // a String is not always a String + return pigObj.toString(); + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: return pigObj; + case SMALLINT: + case TINYINT: + case BOOLEAN: + // would not pass schema validation anyway + throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE); + default: + throw new BackendException("Unexpected type "+type+" for value "+pigObj+" of class "+pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE); } } @@ -339,84 +349,53 @@ // dictated by semantics, consult HCatSchema of table when need be. for(FieldSchema pigField : pigSchema.getFields()){ - byte type = pigField.type; - String alias = pigField.alias; - validateAlias(alias); - HCatFieldSchema hcatField = getTableCol(alias, tblSchema); + HCatFieldSchema hcatField = getTableCol(pigField.alias, tblSchema); - if(DataType.isComplex(type)){ - switch(type){ + validateSchema(pigField, hcatField); + } - case DataType.MAP: - if(hcatField != null){ - if(hcatField.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(hcatField.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex" + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - break; + try { + PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema); + } catch (IOException e) { + throw new FrontendException("HCatalog schema is not compatible with Pig: "+e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e); + } + } - case DataType.BAG: - // Only map is allowed as complex type in tuples inside bag. - for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ - if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - validateAlias(innerField.alias); + + private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField) + throws HCatException, FrontendException { + validateAlias(pigField.alias); + byte type = pigField.type; + if(DataType.isComplex(type)){ + switch(type){ + + case DataType.MAP: + if(hcatField != null){ + if(hcatField.getMapKeyType() != Type.STRING){ + throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); } - if(hcatField != null){ - // Do the same validation for HCatSchema. - HCatFieldSchema arrayFieldScehma = hcatField.getArrayElementSchema().get(0); - Type hType = arrayFieldScehma.getType(); - if(hType == Type.STRUCT){ - for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){ - if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){ - throw new FrontendException("Nested Complex types not allowed "+ hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - if(hType == Type.MAP){ - if(arrayFieldScehma.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - if(hType == Type.ARRAY) { - throw new FrontendException("Arrays cannot contain array within it. "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } + if(hcatField.getMapValueSchema().get(0).isComplex()){ + throw new FrontendException("Value type of map cannot be complex" + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); } - break; + } + break; - case DataType.TUPLE: - validateUnNested(pigField.schema); - if(hcatField != null){ - for(HCatFieldSchema structFieldSchema : hcatField.getStructSubSchema().getFields()){ - if(structFieldSchema.isComplex()){ - throw new FrontendException("Nested Complex types are not allowed."+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - break; + case DataType.BAG: + HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema(); + for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ + validateSchema(innerField, getTableCol(pigField.alias, arrayElementSchema)); + } + break; - default: - throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); + case DataType.TUPLE: + HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema(); + for(FieldSchema innerField : pigField.schema.getFields()){ + validateSchema(innerField, getTableCol(pigField.alias, structSubSchema)); } - } - } + break; - for(HCatFieldSchema hcatField : tblSchema.getFields()){ - - // We dont do type promotion/demotion. - Type hType = hcatField.getType(); - switch(hType){ - case SMALLINT: - case TINYINT: - case BOOLEAN: - throw new FrontendException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); + default: + throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); } } } @@ -432,10 +411,11 @@ // Finds column by name in HCatSchema, if not found returns null. private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){ - - for(HCatFieldSchema hcatField : tblSchema.getFields()){ - if(hcatField.getName().equalsIgnoreCase(alias)){ - return hcatField; + if (tblSchema != null) { + for(HCatFieldSchema hcatField : tblSchema.getFields()){ + if(hcatField!=null && hcatField.getName()!= null && hcatField.getName().equalsIgnoreCase(alias)){ + return hcatField; + } } } // Its a new column Index: src/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1228666) +++ src/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -32,18 +32,18 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.HCatArrayBag; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; -import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -94,7 +94,7 @@ static HiveMetaStoreClient client = null; private static HiveMetaStoreClient createHiveMetaClient(String serverUri, - String serverKerberosPrincipal, Class clazz) throws Exception { + String serverKerberosPrincipal, Class clazz) throws Exception { if (client != null){ return client; } @@ -104,12 +104,12 @@ hiveConf.set("hive.metastore.local", "false"); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim()); } - + if (serverKerberosPrincipal != null){ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); - hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); } - + try { client = new HiveMetaStoreClient(hiveConf,null); } catch (Exception e){ @@ -281,7 +281,7 @@ if (type == Type.BINARY){ return DataType.BYTEARRAY; } - + if (type == Type.BOOLEAN){ errMsg = "HCatalog column type 'BOOLEAN' is not supported in " + "Pig as a column type"; @@ -358,65 +358,47 @@ } return db; } else { - return new HCatArrayBag(list); + return new HCatArrayBag(list); } } - public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { - for (HCatFieldSchema hfs : hcatTableSchema.getFields()){ - Type htype = hfs.getType(); - if (htype == Type.ARRAY){ - validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(hfs); - }else if (htype == Type.STRUCT){ - validateIsPigCompatibleStructWithPrimitives(hfs); - }else if (htype == Type.MAP){ - validateIsPigCompatibleMapWithPrimitives(hfs); - }else { - validateIsPigCompatiblePrimitive(hfs); - } - } + private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException { + for(HCatFieldSchema hcatField : tblSchema.getFields()){ + validateHcatFieldFollowsPigRules(hcatField); + } } - private static void validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes( - HCatFieldSchema hfs) throws IOException { - HCatFieldSchema subFieldSchema = hfs.getArrayElementSchema().getFields().get(0); - if (subFieldSchema.getType() == Type.STRUCT){ - validateIsPigCompatibleStructWithPrimitives(subFieldSchema); - }else if (subFieldSchema.getType() == Type.MAP) { - validateIsPigCompatiblePrimitive(subFieldSchema.getMapValueSchema().getFields().get(0)); - }else { - validateIsPigCompatiblePrimitive(subFieldSchema); + private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException { + try { + Type hType = hcatField.getType(); + switch(hType){ + // We don't do type promotion/demotion. + case SMALLINT: + case TINYINT: + case BOOLEAN: + throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); + case ARRAY: + validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema()); + break; + case STRUCT: + validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema()); + break; + case MAP: + // key is only string + validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema()); + break; } + } catch (HCatException e) { + throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e); + } } - private static void validateIsPigCompatibleMapWithPrimitives(HCatFieldSchema hfs) throws IOException{ - if (hfs.getMapKeyType() != Type.STRING){ - throw new PigException("Incompatible type in schema, found map with " + - "non-string key type in :"+hfs.getTypeString(), PIG_EXCEPTION_CODE); - } - validateIsPigCompatiblePrimitive(hfs.getMapValueSchema().getFields().get(0)); - } - private static void validateIsPigCompatibleStructWithPrimitives(HCatFieldSchema hfs) throws IOException { - for ( HCatFieldSchema subField : hfs.getStructSubSchema().getFields()){ - validateIsPigCompatiblePrimitive(subField); - } + public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { + validateHCatSchemaFollowsPigRules(hcatTableSchema); } - private static void validateIsPigCompatiblePrimitive(HCatFieldSchema hfs) throws IOException { - Type htype = hfs.getType(); - if ( - (hfs.isComplex()) || - (htype == Type.TINYINT) || - (htype == Type.SMALLINT) - ){ - throw new PigException("Incompatible type in schema, expected pig " + - "compatible primitive for:" + hfs.getTypeString()); - } - - } - public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { if(p.getProperty(propName) != null){ config.set(propName, p.getProperty(propName)); Index: src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java =================================================================== --- src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java (revision 1228666) +++ src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java (working copy) @@ -156,6 +156,11 @@ // with crisp Java objects inside it. We have to do it because higher layer // may not know how to do it. + if (data == null) { + return null; + } + // TODO: we probably want to try returning lazy wrappers instead + switch(oi.getCategory()){ case PRIMITIVE: Index: src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java =================================================================== --- src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (revision 1228666) +++ src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (working copy) @@ -122,7 +122,7 @@ * @throws HCatException if call made on non-primitive types */ public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatException { - assertTypeInCategory(type,Category.PRIMITIVE); + assertTypeInCategory(type,Category.PRIMITIVE,fieldName); this.fieldName = fieldName; this.type = type; this.category = Category.PRIMITIVE; @@ -162,8 +162,8 @@ * @throws HCatException if call made on non-Map types */ public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException{ - assertTypeInCategory(type,Category.MAP); - assertTypeInCategory(mapKeyType,Category.PRIMITIVE); + assertTypeInCategory(type,Category.MAP, fieldName); + assertTypeInCategory(mapKeyType,Category.PRIMITIVE, fieldName); this.fieldName = fieldName; this.type = Type.MAP; this.category = Category.MAP; @@ -174,29 +174,29 @@ } public HCatSchema getStructSubSchema() throws HCatException { - assertTypeInCategory(this.type,Category.STRUCT); + assertTypeInCategory(this.type,Category.STRUCT, this.fieldName); return subSchema; } public HCatSchema getArrayElementSchema() throws HCatException { - assertTypeInCategory(this.type,Category.ARRAY); + assertTypeInCategory(this.type,Category.ARRAY, this.fieldName); return subSchema; } public Type getMapKeyType() throws HCatException { - assertTypeInCategory(this.type,Category.MAP); + assertTypeInCategory(this.type,Category.MAP, this.fieldName); return mapKeyType; } public HCatSchema getMapValueSchema() throws HCatException { - assertTypeInCategory(this.type,Category.MAP); + assertTypeInCategory(this.type,Category.MAP, this.fieldName); return subSchema; } - private static void assertTypeInCategory(Type type, Category category) throws HCatException { + private static void assertTypeInCategory(Type type, Category category, String fieldName) throws HCatException { Category typeCategory = Category.fromType(type); if (typeCategory != category){ - throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory); + throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory+ " (field "+fieldName+")"); } }