diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataTransformer.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataTransformer.java new file mode 100644 index 0000000..bac788e --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataTransformer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.data; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.data.schema.HCatFieldSchema; + +/** + * DataTransformer is used to transform data back and forth from a + * TransformedHCatRecord and an underlying HCatRecord. + * + * In this conversion, TransformedHCatRecord is seen to be a "higher" + * layer, and HCatRecord is seen to be a lower layer, whereby the + * TransformedHCatRecord conforms to datatype requirements of layers + * above HCat, such as pig. + * + * So, in that case, Pig would convert int to underlying tinyint/smallints + * while writing, and convert smallints/tinyints to ints when reading, since + * it cannot handle those data types. + * + */ +public class DataTransformer { + + public void initiailize(Configuration conf) { + // do nothing default + } + + public Object convertToWrite(Object object, HCatFieldSchema hCatFieldSchema) throws Exception { + return object; // base implementation is to do no transformation + } + + public Object convertToRead(Object object, HCatFieldSchema hCatFieldSchema) throws Exception { + return object; // base implementation is to do no transformation + } + +} diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java index 6328b50..09bf26c 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java @@ -42,8 +42,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatContext; import org.apache.hcatalog.data.schema.HCatSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -265,28 +263,6 @@ public static Object serializeField(Object field, ObjectInspector fieldObjectIns private static Object serializePrimitiveField(Object field, ObjectInspector fieldObjectInspector) { - - if (field != null && HCatContext.INSTANCE.getConf().isPresent()) { - Configuration conf = HCatContext.INSTANCE.getConf().get(); - - if (field instanceof Boolean && - conf.getBoolean( - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { - return ((Boolean) field) ? 1 : 0; - } else if (field instanceof Short && - conf.getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - return new Integer((Short) field); - } else if (field instanceof Byte && - conf.getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - return new Integer((Byte) field); - } - } - return ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); } diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/TransformedHCatRecord.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/TransformedHCatRecord.java new file mode 100644 index 0000000..db626db --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/TransformedHCatRecord.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +/** + * ConvertedHCatRecord is a HCatRecord wrapper that wraps + * a given HCatRecord, its schema and a DataTransformer + * implementation so as to perform data conversions for + * layers above it. + * + * For eg., Pig does not have support for SmallInt/TinyInt + * and would need to do a conversion to Int to read from a + * HCatRecord, and it would need to store an appropriate + * value as SmallInt/TinyInt into an underlying HCatRecord + * when writing to it. + * + * In this case, it would implement an appropriate DataTransformer + * and use ConvertedHCatRecords in conjunction with that, so that + * as far as Pig is concerned, there exist no Ints. + */ + +public class TransformedHCatRecord extends HCatRecord { + + private HCatRecord r; + private HCatSchema s; + private DataTransformer dt; + + private static final String UTF8 = "UTF-8"; + + public TransformedHCatRecord(HCatRecord r, HCatSchema s, DataTransformer dt){ + this.r = r; + this.s = s; + this.dt = dt; + } + + @Override + public Object get(int fieldNum) { + try { + return dt.convertToRead(r.get(fieldNum),s.get(fieldNum)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public List getAll() { + // Performance hit if used more than once. We do not + // cache - that'd make other semantics like set harder. + List o = new ArrayList(); + if (r != null){ + for(int i = 0; i < r.size() ; i++){ + o.add(get(i)); + } + } + return o; + } + + @Override + public void set(int fieldNum, Object value) { + try { + r.set(fieldNum, dt.convertToWrite(value,s.get(fieldNum))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public int size() { + return r.size(); + } + + @Override + public void readFields(DataInput in) throws IOException { + String dtClassName = (String) ReaderWriter.readDatum(in); + String rClassName = (String) ReaderWriter.readDatum(in); + s = HCatSchemaUtils.getHCatSchema((String) ReaderWriter.readDatum(in)); + try { + dt = (DataTransformer) ReflectionUtils.newInstance(Class.forName(dtClassName), null); + r = (HCatRecord) ReflectionUtils.newInstance(Class.forName(rClassName), null); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + r.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + ReaderWriter.writeDatum(out, dt.getClass().getName()); + ReaderWriter.writeDatum(out, r.getClass().getName()); + ReaderWriter.writeDatum(out, s.getSchemaAsTypeString()); + r.write(out); + } + + @Override + public Object get(String fieldName, HCatSchema recordSchema) throws HCatException { + return get(recordSchema.getPosition(fieldName)); + } + + @Override + public void set(String fieldName, HCatSchema recordSchema, Object value) throws HCatException { + set(recordSchema.getPosition(fieldName), value); + } + + @Override + public void remove(int idx) throws HCatException { + s.remove(s.get(idx)); + r.remove(idx); + } + + @Override + public void copy(HCatRecord other) throws HCatException { + r.copy(other); + } + +} diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java index 1b4971f..b3ea7b0 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java @@ -31,8 +31,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatContext; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; @@ -140,17 +138,9 @@ private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fie private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { switch (((PrimitiveTypeInfo) basePrimitiveTypeInfo).getPrimitiveCategory()) { case BOOLEAN: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) ? - Type.INT : Type.BOOLEAN; + return Type.BOOLEAN; case BYTE: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? - Type.INT : Type.TINYINT; + return Type.TINYINT; case DOUBLE: return Type.DOUBLE; case FLOAT: @@ -160,11 +150,7 @@ private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { case LONG: return Type.BIGINT; case SHORT: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? - Type.INT : Type.SMALLINT; + return Type.SMALLINT; case STRING: return Type.STRING; case BINARY: diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java index 3c3afa5..e1798c9 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java @@ -28,7 +28,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hcatalog.data.DataTransformer; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.TransformedHCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.PartInfo; @@ -56,11 +58,16 @@ HCatSchema outputSchema = null; + protected DataTransformer transformer = new PigDataTransformer(); + @Override public Tuple getNext() throws IOException { try { - HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null); + HCatRecord hr = (reader.nextKeyValue() ? + new TransformedHCatRecord( + (HCatRecord) reader.getCurrentValue(),outputSchema,transformer) + : null); Tuple t = PigHCatUtil.transformToTuple(hr, outputSchema); // TODO : we were discussing an iter interface, and also a LazyTuple // change this when plans for that solidifies. diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java index a19e98e..e98e712 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DataTransformer; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; @@ -72,6 +73,8 @@ protected static final String PIG_SCHEMA = "hcat.pig.store.schema"; protected String sign; + protected DataTransformer transformer = new PigDataTransformer(); + public HCatBaseStorer(String partSpecs, String schema) throws Exception { partitionKeys = new ArrayList(); @@ -308,30 +311,22 @@ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatExce case BIGINT: case FLOAT: case DOUBLE: - return pigObj; case SMALLINT: - if (pigObj == null) { - return null; - } - if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) { - throw new BackendException("Value " + pigObj + " is outside the bounds of column " + - hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); - } - return ((Integer) pigObj).shortValue(); case TINYINT: - if (pigObj == null) { - return null; - } - if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) { - throw new BackendException("Value " + pigObj + " is outside the bounds of column " + - hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); - } - return ((Integer) pigObj).byteValue(); case BOOLEAN: - // would not pass schema validation anyway - throw new BackendException("Incompatible type " + type + " found in hcat table schema: " + hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE); + try { + return transformer.convertToWrite(pigObj, hcatFS); + } catch (Exception e){ + throw new BackendException("Data transformation error for type " + + type + " for value " + pigObj + + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), + PigHCatUtil.PIG_EXCEPTION_CODE); + } default: - throw new BackendException("Unexpected type " + type + " for value " + pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE); + throw new BackendException("Unexpected type " + type + + " for value " + pigObj + + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), + PigHCatUtil.PIG_EXCEPTION_CODE); } } catch (BackendException e) { // provide the path to the field in the error message diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index a645ba1..bdd9e24 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -85,6 +85,8 @@ public String relativeToAbsolutePath(String location, Path curDir) throws IOExce public void setLocation(String location, Job job) throws IOException { HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get() .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); + transformer.initiailize(HCatContext.INSTANCE.getConf().get()); + UDFContext udfContext = UDFContext.getUDFContext(); Properties udfProps = udfContext.getUDFProperties(this.getClass(), @@ -189,6 +191,7 @@ public void setLocation(String location, Job job) throws IOException { public ResourceSchema getSchema(String location, Job job) throws IOException { HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get() .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); + transformer.initiailize(HCatContext.INSTANCE.getConf().get()); Table table = phutil.getTable(location, hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job), diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java index 4a439e7..4e11733 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java @@ -79,7 +79,8 @@ public OutputFormat getOutputFormat() throws IOException { @Override public void setStoreLocation(String location, Job job) throws IOException { HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get() - .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false); + .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); + transformer.initiailize(HCatContext.INSTANCE.getConf().get()); Configuration config = job.getConfiguration(); config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigDataTransformer.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigDataTransformer.java new file mode 100644 index 0000000..240fc57 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigDataTransformer.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.pig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.data.DataTransformer; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; +import org.apache.pig.backend.BackendException; + +/** + * Pig's implementation of DataTransformer + */ + +public class PigDataTransformer extends DataTransformer { + + private boolean promoteBooleansToInt = HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT; + private boolean promoteShortsAndBytesToInt = HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT; + + public PigDataTransformer(){ + } + + @Override + public void initiailize(Configuration conf) { + if (conf!=null){ + promoteBooleansToInt = conf.getBoolean( + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT); + promoteShortsAndBytesToInt = conf.getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT); + } + } + + + @Override + public Object convertToWrite(Object o, HCatFieldSchema hCatFieldSchema) throws Exception { + // Called during HCatStorer + Type type = hCatFieldSchema.getType(); + switch(hCatFieldSchema.getType()){ + case TINYINT : + if (promoteShortsAndBytesToInt) { + if ((Integer) o < Byte.MIN_VALUE || (Integer) o > Byte.MAX_VALUE) { + throw new BackendException( + "Value " + o + " is outside the bounds of column " + + hCatFieldSchema.getName() + " with type " + + type, PigHCatUtil.PIG_EXCEPTION_CODE); + } + return new Byte(((Integer)o).byteValue()); + } else { + return o; + } + case SMALLINT : + if (promoteShortsAndBytesToInt) { + if ((Integer) o < Short.MIN_VALUE || (Integer) o > Short.MAX_VALUE) { + throw new BackendException( + "Value " + o + " is outside the bounds of column " + + hCatFieldSchema.getName() + " with type " + + type, PigHCatUtil.PIG_EXCEPTION_CODE); + } + return new Short(((Integer)o).shortValue()); + } else { + return o; + } + case BOOLEAN : + if (promoteBooleansToInt) { + return (((Integer)o).intValue() == 1); + } else { + return o; + } + } + return o; + } + + @Override + public Object convertToRead(Object o, HCatFieldSchema hCatFieldSchema) throws Exception { + // Called during HCatLoader + switch(hCatFieldSchema.getType()){ + case TINYINT : + if (promoteShortsAndBytesToInt) { + return new Integer((Byte)o); + } else { + return o; + } + case SMALLINT : + if (promoteShortsAndBytesToInt) { + return new Integer((Short)o); + } else { + return o; + } + case BOOLEAN : + if (promoteBooleansToInt) { + return ((Boolean)o) ? 1 : 0; + } else { + return o; + } + } + return o; // base implementation is to do no transformation + } + +} diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index f6d609b..06f8471 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -104,6 +104,10 @@ } } + static public boolean pigHasBooleanSupport(){ + return pigHasBooleanSupport; + } + static public Pair getDBTableNames(String location) throws IOException { // the location string will be of the form: // . - parse it and diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java new file mode 100644 index 0000000..8727bf1 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatContext; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; + +public class TestE2EScenarios extends TestCase { + + private static final String TEST_DATA_DIR = System.getProperty("user.dir") + + "/build/test/data/" + TestHCatLoader.class.getCanonicalName(); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + + private static final String TEXTFILE_LOCN = TEST_DATA_DIR + "/textfile"; + + private static Driver driver; + + protected String storageFormat() { + return "orc"; + } + + @Override + protected void setUp() throws Exception { + + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + new File(TEST_WAREHOUSE_DIR).mkdirs(); + + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + + } + + @Override + protected void tearDown() throws Exception { + dropTable("inpy"); + dropTable("rc5318"); + dropTable("orc5318"); + } + + private void dropTable(String tablename) throws IOException, CommandNeedRetryException { + driver.run("drop table " + tablename); + } + + private void createTable(String tablename, String schema, String partitionedBy, String storageFormat) throws IOException, CommandNeedRetryException { + String createTable; + createTable = "create table " + tablename + "(" + schema + ") "; + if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { + createTable = createTable + "partitioned by (" + partitionedBy + ") "; + } + if (storageFormat != null){ + createTable = createTable + "stored as " +storageFormat; + } + driverRun(createTable); + } + + private void driverRun(String cmd) throws IOException, CommandNeedRetryException { + int retCode = driver.run(cmd).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to run [" + + cmd + "], return code from hive driver : [" + retCode + "]"); + } + } + + private void pigDump(String tableName) throws IOException { + PigServer server = new PigServer(ExecType.LOCAL); + + System.err.println("==="); + System.err.println(tableName+":"); + server.registerQuery("X = load '" + tableName + + "' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("X"); + while (XIter.hasNext()) { + Tuple t = XIter.next(); + for (Object o : t.getAll()){ + System.err.print( + "\t(" + o.getClass().getName() + ":" + + o.toString() + ")" + ); + } + System.err.println(""); + } + System.err.println("==="); + } + + + private void copyTable(String in, String out) throws IOException, InterruptedException { + Job ijob = new Job(); + Job ojob = new Job(); + HCatInputFormat inpy = new HCatInputFormat(); + inpy.setInput(ijob , null, in); + HCatOutputFormat oupy = new HCatOutputFormat(); + oupy.setOutput(ojob, + OutputJobInfo.create(null, out, new HashMap() + )); + + // Test HCatContext + + System.err.println("HCatContext INSTANCE is present : " +HCatContext.INSTANCE.getConf().isPresent()); + if (HCatContext.INSTANCE.getConf().isPresent()){ + System.err.println("HCatContext tinyint->int promotion says " + + HCatContext.INSTANCE.getConf().get().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)); + } + + HCatSchema tableSchema = inpy.getTableSchema(ijob.getConfiguration()); + System.err.println("Copying from ["+in+"] to ["+out+"] with schema : "+ tableSchema.toString()); + oupy.setSchema(ojob, tableSchema); + oupy.checkOutputSpecs(ojob); + OutputCommitter oc = oupy.getOutputCommitter(createTaskAttemptContext(ojob.getConfiguration())); + oc.setupJob(ojob); + + for (InputSplit split : inpy.getSplits(ijob)){ + + TaskAttemptContext rtaskContext = createTaskAttemptContext(ijob.getConfiguration()); + TaskAttemptContext wtaskContext = createTaskAttemptContext(ojob.getConfiguration()); + + RecordReader rr = inpy.createRecordReader(split, rtaskContext); + rr.initialize(split, rtaskContext); + + OutputCommitter taskOc = oupy.getOutputCommitter(wtaskContext); + taskOc.setupTask(wtaskContext); + RecordWriter, HCatRecord> rw = oupy.getRecordWriter(wtaskContext); + + while(rr.nextKeyValue()){ + rw.write(rr.getCurrentKey(), rr.getCurrentValue()); + } + rw.close(wtaskContext); + taskOc.commitTask(wtaskContext); + rr.close(); + } + + oc.commitJob(ojob); + } + + private TaskAttemptContext createTaskAttemptContext(Configuration tconf) { + Configuration conf = (tconf == null) ? (new Configuration()) : tconf; + TaskAttemptID taskId = new TaskAttemptID(); + conf.setInt("mapred.task.partition", taskId.getId()); + conf.set("mapred.task.id", "attempt__0000_r_000000_" + taskId.getId()); + TaskAttemptContext rtaskContext = new TaskAttemptContext(conf , taskId ); + return rtaskContext; + } + + + public void testReadOrcAndRCFromPig() throws Exception { + String tableSchema = "ti tinyint, si smallint,i int, bi bigint, f float, d double, b boolean"; + + HcatTestUtils.createTestDataFile(TEXTFILE_LOCN, + new String[]{ + "-3\0019001\00186400\0014294967297\00134.532\0012184239842983489.1231231234\001true" + ,"0\0010\0010\0010\0010\0010\001false" + } + ); + + // write this out to a file, and import it into hive + createTable("inpy",tableSchema,null,"textfile"); + createTable("rc5318",tableSchema,null,"rcfile"); + createTable("orc5318",tableSchema,null,"orc"); + driverRun("LOAD DATA LOCAL INPATH '"+TEXTFILE_LOCN+"' OVERWRITE INTO TABLE inpy"); + + // write it out from hive to an rcfile table, and to an orc table +// driverRun("insert overwrite table rc5318 select * from inpy"); + copyTable("inpy","rc5318"); +// driverRun("insert overwrite table orc5318 select * from inpy"); + copyTable("inpy","orc5318"); + + pigDump("inpy"); + pigDump("rc5318"); + pigDump("orc5318"); + + } + +} diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java index e907c73..47339d9 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java @@ -65,7 +65,7 @@ private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass private static boolean setupHasRun = false; - + private static Map> basicInputData; protected String storageFormat() { @@ -413,7 +413,7 @@ public void testConvertBooleanToInt() throws Exception { File inputDataDir = new File(inputFileName).getParentFile(); inputDataDir.mkdir(); - String[] lines = new String[]{"llama\t1", "alpaca\t0"}; + String[] lines = new String[]{"llama\ttrue", "alpaca\tfalse"}; HcatTestUtils.createTestDataFile(inputFileName, lines); assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode()); @@ -433,13 +433,15 @@ public void testConvertBooleanToInt() throws Exception { assertEquals("a", schema.getField(0).alias); assertEquals(DataType.CHARARRAY, schema.getField(0).type); assertEquals("b", schema.getField(1).alias); - assertEquals(DataType.INTEGER, schema.getField(1).type); + if (PigHCatUtil.pigHasBooleanSupport()){ + assertEquals(DataType.BOOLEAN, schema.getField(1).type); + } else { + assertEquals(DataType.INTEGER, schema.getField(1).type); + } Iterator iterator = server.openIterator("data"); Tuple t = iterator.next(); assertEquals("llama", t.get(0)); - // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert - // passes because data was loaded as integers, not because it was converted. assertEquals(1, t.get(1)); t = iterator.next(); assertEquals("alpaca", t.get(0)); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java index 111b606..1be55dd 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java @@ -60,14 +60,18 @@ private void dropTable(String tablename) throws IOException, CommandNeedRetryExc driver.run("drop table " + tablename); } + protected String storageFormat() { + return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')"; + } + private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException { String createTable; createTable = "create table " + tablename + "(" + schema + ") "; if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } - createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + - "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + createTable = createTable + "stored as " + storageFormat(); LOG.info("Creating table:\n {}", createTable); CommandProcessorResponse result = driver.run(createTable); int retCode = result.getResponseCode(); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java new file mode 100644 index 0000000..7d91364 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +public class TestOrcHCatLoaderComplexSchema extends TestHCatLoaderComplexSchema { + + @Override + protected String storageFormat() { + return "orc"; + } + +}