diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatContext.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatContext.java index f729750..84230fb 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatContext.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatContext.java @@ -63,17 +63,12 @@ public synchronized HCatContext setConf(Configuration newConf) { conf = newConf; return this; } - - if (conf != newConf) { - synchronized (conf) { - for (Map.Entry entry : conf) { - if ((entry.getKey().matches("hcat.*")) && (newConf.get(entry.getKey()) == null)) { - newConf.set(entry.getKey(), entry.getValue()); - } - } + for (Map.Entry entry : conf) { + if ((entry.getKey().matches("hcat.*")) && (newConf.get(entry.getKey()) == null)) { + newConf.set(entry.getKey(), entry.getValue()); } - conf = newConf; } + conf = newConf; return this; } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 18e3556..806f82a 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -120,7 +120,7 @@ public static Object deserialize(String str) throws IOException { } public static String encodeBytes(byte[] bytes) { - StringBuffer strBuf = new StringBuffer(); + StringBuilder strBuf = new StringBuilder(); for (int i = 0; i < bytes.length; i++) { strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a'))); @@ -283,11 +283,13 @@ public static HCatSchema getPartitionColumns(Table table) throws IOException { .getTypeInfoFromTypeString(tableField.getType()); if (!partitionType.equals(tableType)) { - throw new HCatException( - ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <" + String msg = + "Column <" + field.getName() + ">, expected <" + tableType.getTypeName() + ">, got <" - + partitionType.getTypeName() + ">"); + + partitionType.getTypeName() + ">"; + LOG.warn(msg); + throw new HCatException(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, msg); } } } @@ -652,4 +654,12 @@ public static boolean isHadoop23() { public static String makePathASafeFileName(String filePath) { return new File(filePath).getPath().replaceAll("\\\\", "/"); } + public static void assertNotNull(Object t, String msg, Logger logger) { + if(t == null) { + if(logger != null) { + logger.warn(msg); + } + throw new IllegalArgumentException(msg); + } + } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java index d002bf9..d917eab 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java @@ -19,15 +19,26 @@ package org.apache.hive.hcatalog.data; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; + +import java.sql.Date; +import java.sql.Timestamp; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.Map.Entry; - +/* + * The data written and read withing the same M/R job, thus should never be written by one + * version of Hive and read by another. + * @see org.apache.hive.hcatalog.data.ReaderWriter + */ public abstract class DataType { - + //todo: this should be moved to be an inner class of ReaderWrite as that is the only place it + // is used public static final byte NULL = 1; public static final byte BOOLEAN = 5; public static final byte BYTE = 6; @@ -38,6 +49,11 @@ public static final byte DOUBLE = 25; public static final byte STRING = 55; public static final byte BINARY = 60; + public static final byte CHAR = 61; + public static final byte VARCHAR = 62; + public static final byte DECIMAL = 63; + public static final byte DATE = 64; + public static final byte TIMESTAMP = 65; public static final byte MAP = 100; public static final byte STRUCT = 110; @@ -79,6 +95,16 @@ public static byte findType(Object o) { return MAP; } else if (o instanceof byte[]) { return BINARY; + } else if(o instanceof HiveChar) { + return CHAR; + } else if(o instanceof HiveVarchar) { + return VARCHAR; + } else if(o instanceof HiveDecimal) { + return DECIMAL; + } else if(o instanceof Date) { + return DATE; + } else if(o instanceof Timestamp) { + return TIMESTAMP; } else { return ERROR; } @@ -170,8 +196,17 @@ public static int compare(Object o1, Object o2, byte dt1, byte dt2) { return 0; } } - - default: + case CHAR: + return ((HiveChar)o1).compareTo((HiveChar)o2); + case VARCHAR: + return ((HiveVarchar)o1).compareTo((HiveVarchar)o2); + case DECIMAL: + return ((HiveDecimal)o1).compareTo((HiveDecimal)o2); + case DATE: + return ((Date)o1).compareTo((Date)o2); + case TIMESTAMP: + return ((Timestamp)o1).compareTo((Timestamp)o2); + default: throw new RuntimeException("Unkown type " + dt1 + " in compare"); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/HCatRecord.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/HCatRecord.java index 37de959..75f8be0 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/HCatRecord.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/HCatRecord.java @@ -19,11 +19,16 @@ package org.apache.hive.hcatalog.data; +import java.sql.Date; +import java.sql.Timestamp; import java.util.List; import java.util.Map; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.schema.HCatSchema; @@ -149,5 +154,43 @@ public void setList(String fieldName, HCatSchema recordSchema, List value) th public void setMap(String fieldName, HCatSchema recordSchema, Map value) throws HCatException { set(fieldName, recordSchema, value); } + public void setChar(String fieldName, HCatSchema recordSchema, HiveChar value) + throws HCatException { + set(fieldName, recordSchema, value); + } + public HiveChar getChar(String fieldName, HCatSchema recordSchema) throws HCatException { + return (HiveChar) get(fieldName, recordSchema, HiveChar.class); + } + public void setVarchar(String fieldName, HCatSchema recordSchema, HiveVarchar value) + throws HCatException { + set(fieldName, recordSchema, value); + } + public HiveVarchar getVarchar(String fieldName, HCatSchema recordSchema) throws HCatException { + return (HiveVarchar) get(fieldName, recordSchema, HiveVarchar.class); + } + public void setDecimal(String fieldName, HCatSchema recordSchema, HiveDecimal value) + throws HCatException { + set(fieldName, recordSchema, value); + } + public HiveDecimal getDecimal(String fieldName, HCatSchema recordSchema) throws HCatException { + return (HiveDecimal) get(fieldName, recordSchema, HiveDecimal.class); + } + /** + * Note that the proper way to construct a java.sql.Date for use with this object is + * Date.valueOf("1999-12-31"). + */ + public void setDate(String fieldName, HCatSchema recordSchema, Date value) throws HCatException { + set(fieldName, recordSchema, value); + } + public Date getDate(String fieldName, HCatSchema recordSchema) throws HCatException { + return (Date) get(fieldName, recordSchema, Date.class); + } + public void setTimestamp(String fieldName, HCatSchema recordSchema, Timestamp value) + throws HCatException { + set(fieldName, recordSchema, value); + } + public Timestamp getTimestamp(String fieldName, HCatSchema recordSchema) throws HCatException { + return (Timestamp) get(fieldName, recordSchema, Timestamp.class); + } } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/JsonSerDe.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/JsonSerDe.java index 7963183..9c87aa3 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/JsonSerDe.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/JsonSerDe.java @@ -20,6 +20,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -31,6 +33,9 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDe; @@ -46,13 +51,19 @@ import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -75,9 +86,6 @@ private static final Logger LOG = LoggerFactory.getLogger(JsonSerDe.class); private List columnNames; - private List columnTypes; - - private StructTypeInfo rowTypeInfo; private HCatSchema schema; private JsonFactory jsonFactory = null; @@ -87,6 +95,8 @@ @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { + List columnTypes; + StructTypeInfo rowTypeInfo; LOG.debug("Initializing JsonSerDe"); @@ -195,7 +205,7 @@ private void populateRecord(List r, JsonToken token, JsonParser p, HCatS // column mapping, and we're about to proceed. } HCatFieldSchema hcatFieldSchema = s.getFields().get(fpos); - Object currField = extractCurrentField(p, null, hcatFieldSchema, false); + Object currField = extractCurrentField(p, hcatFieldSchema, false); r.set(fpos, currField); } @@ -241,17 +251,12 @@ private void skipValue(JsonParser p) throws JsonParseException, IOException { /** * Utility method to extract current expected field from given JsonParser * - * To get the field, we need either a type or a hcatFieldSchema(necessary for complex types) - * It is possible that one of them can be null, and so, if so, the other is instantiated - * from the other - * * isTokenCurrent is a boolean variable also passed in, which determines * if the JsonParser is already at the token we expect to read next, or * needs advancing to the next before we read. */ - private Object extractCurrentField(JsonParser p, Type t, - HCatFieldSchema hcatFieldSchema, boolean isTokenCurrent) throws IOException, JsonParseException, - HCatException { + private Object extractCurrentField(JsonParser p, HCatFieldSchema hcatFieldSchema, + boolean isTokenCurrent) throws IOException { Object val = null; JsonToken valueToken; if (isTokenCurrent) { @@ -259,11 +264,7 @@ private Object extractCurrentField(JsonParser p, Type t, } else { valueToken = p.nextToken(); } - - if (hcatFieldSchema != null) { - t = hcatFieldSchema.getType(); - } - switch (t) { + switch (hcatFieldSchema.getType()) { case INT: val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getIntValue(); break; @@ -295,6 +296,23 @@ private Object extractCurrentField(JsonParser p, Type t, break; case BINARY: throw new IOException("JsonSerDe does not support BINARY type"); + case DATE: + val = (valueToken == JsonToken.VALUE_NULL) ? null : Date.valueOf(p.getText()); + break; + case TIMESTAMP: + val = (valueToken == JsonToken.VALUE_NULL) ? null : Timestamp.valueOf(p.getText()); + break; + case DECIMAL: + val = (valueToken == JsonToken.VALUE_NULL) ? null : HiveDecimal.create(p.getText()); + break; + case VARCHAR: + int vLen = ((BaseCharTypeInfo)hcatFieldSchema.getTypeInfo()).getLength(); + val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveVarchar(p.getText(), vLen); + break; + case CHAR: + int cLen = ((BaseCharTypeInfo)hcatFieldSchema.getTypeInfo()).getLength(); + val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveChar(p.getText(), cLen); + break; case ARRAY: if (valueToken == JsonToken.VALUE_NULL) { val = null; @@ -305,7 +323,7 @@ private Object extractCurrentField(JsonParser p, Type t, } List arr = new ArrayList(); while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) { - arr.add(extractCurrentField(p, null, hcatFieldSchema.getArrayElementSchema().get(0), true)); + arr.add(extractCurrentField(p, hcatFieldSchema.getArrayElementSchema().get(0), true)); } val = arr; break; @@ -318,15 +336,14 @@ private Object extractCurrentField(JsonParser p, Type t, throw new IOException("Start of Object expected"); } Map map = new LinkedHashMap(); - Type keyType = hcatFieldSchema.getMapKeyType(); HCatFieldSchema valueSchema = hcatFieldSchema.getMapValueSchema().get(0); while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { - Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(), keyType); + Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(), hcatFieldSchema.getMapKeyTypeInfo()); Object v; if (valueSchema.getType() == HCatFieldSchema.Type.STRUCT) { - v = extractCurrentField(p, null, valueSchema, false); + v = extractCurrentField(p, valueSchema, false); } else { - v = extractCurrentField(p, null, valueSchema, true); + v = extractCurrentField(p, valueSchema, true); } map.put(k, v); @@ -350,12 +367,16 @@ private Object extractCurrentField(JsonParser p, Type t, } val = struct; break; + default: + LOG.error("Unknown type found: " + hcatFieldSchema.getType()); + return null; } return val; } - private Object getObjectOfCorrespondingPrimitiveType(String s, Type t) throws IOException { - switch (t) { + private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveTypeInfo mapKeyType) + throws IOException { + switch (Type.getPrimitiveHType(mapKeyType)) { case INT: return Integer.valueOf(s); case TINYINT: @@ -374,8 +395,18 @@ private Object getObjectOfCorrespondingPrimitiveType(String s, Type t) throws IO return s; case BINARY: throw new IOException("JsonSerDe does not support BINARY type"); + case DATE: + return Date.valueOf(s); + case TIMESTAMP: + return Timestamp.valueOf(s); + case DECIMAL: + return HiveDecimal.create(s); + case VARCHAR: + return new HiveVarchar(s, ((BaseCharTypeInfo)mapKeyType).getLength()); + case CHAR: + return new HiveChar(s, ((BaseCharTypeInfo)mapKeyType).getLength()); } - throw new IOException("Could not convert from string to map type " + t); + throw new IOException("Could not convert from string to map type " + mapKeyType.getTypeName()); } /** @@ -399,9 +430,7 @@ public Writable serialize(Object obj, ObjectInspector objInspector) if (i > 0) { sb.append(SerDeUtils.COMMA); } - sb.append(SerDeUtils.QUOTE); - sb.append(columnNames.get(i)); - sb.append(SerDeUtils.QUOTE); + appendWithQuotes(sb, columnNames.get(i)); sb.append(SerDeUtils.COLON); buildJSONString(sb, soi.getStructFieldData(obj, structFields.get(i)), structFields.get(i).getFieldObjectInspector()); @@ -415,7 +444,9 @@ public Writable serialize(Object obj, ObjectInspector objInspector) } return new Text(sb.toString()); } - + private static StringBuilder appendWithQuotes(StringBuilder sb, String value) { + return sb == null ? null : sb.append(SerDeUtils.QUOTE).append(value).append(SerDeUtils.QUOTE); + } // TODO : code section copied over from SerDeUtils because of non-standard json production there // should use quotes for all field names. We should fix this there, and then remove this copy. // See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES @@ -461,25 +492,37 @@ private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector break; } case STRING: { - sb.append('"'); - sb.append(SerDeUtils.escapeString(((StringObjectInspector) poi) - .getPrimitiveJavaObject(o))); - sb.append('"'); - break; - } - case TIMESTAMP: { - sb.append('"'); - sb.append(((TimestampObjectInspector) poi) - .getPrimitiveWritableObject(o)); - sb.append('"'); + String s = + SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o)); + appendWithQuotes(sb, s); break; } case BINARY: { throw new IOException("JsonSerDe does not support BINARY type"); } + case DATE: + Date d = ((DateObjectInspector)poi).getPrimitiveJavaObject(o); + appendWithQuotes(sb, d.toString()); + break; + case TIMESTAMP: { + Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o); + appendWithQuotes(sb, t.toString()); + break; + } + case DECIMAL: + sb.append(((HiveDecimalObjectInspector)poi).getPrimitiveJavaObject(o)); + break; + case VARCHAR: + appendWithQuotes(sb, + ((HiveVarcharObjectInspector)poi).getPrimitiveJavaObject(o).toString()); + break; + case CHAR: + //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13) + // HiveChar.toString() returns getPaddedValue() + appendWithQuotes(sb, ((HiveCharObjectInspector)poi).getPrimitiveJavaObject(o).toString()); + break; default: - throw new RuntimeException("Unknown primitive type: " - + poi.getPrimitiveCategory()); + throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory()); } } break; @@ -524,13 +567,11 @@ private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector StringBuilder keyBuilder = new StringBuilder(); buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector); String keyString = keyBuilder.toString().trim(); - boolean doQuoting = (!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE); - if (doQuoting) { - sb.append(SerDeUtils.QUOTE); + if((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) { + appendWithQuotes(sb, keyString); } - sb.append(keyString); - if (doQuoting) { - sb.append(SerDeUtils.QUOTE); + else { + sb.append(keyString); } sb.append(SerDeUtils.COLON); buildJSONString(sb, e.getValue(), mapValueObjectInspector); @@ -550,9 +591,7 @@ private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector if (i > 0) { sb.append(SerDeUtils.COMMA); } - sb.append(SerDeUtils.QUOTE); - sb.append(structFields.get(i).getFieldName()); - sb.append(SerDeUtils.QUOTE); + appendWithQuotes(sb, structFields.get(i).getFieldName()); sb.append(SerDeUtils.COLON); buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)), structFields.get(i).getFieldObjectInspector()); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/ReaderWriter.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/ReaderWriter.java index 7719569..d03ca3d 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/ReaderWriter.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/ReaderWriter.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.sql.Date; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -29,10 +30,21 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.VLongWritable; - +/* + * when adding support for new types, we should try to use classes of Hive value system to keep + * things more readable (though functionally it should not make a difference). + */ public abstract class ReaderWriter { private static final String UTF8 = "UTF-8"; @@ -96,7 +108,26 @@ public static Object readDatum(DataInput in) throws IOException { list.add(readDatum(in)); } return list; - + case DataType.CHAR: + HiveCharWritable hcw = new HiveCharWritable(); + hcw.readFields(in); + return hcw.getHiveChar(); + case DataType.VARCHAR: + HiveVarcharWritable hvw = new HiveVarcharWritable(); + hvw.readFields(in); + return hvw.getHiveVarchar(); + case DataType.DECIMAL: + HiveDecimalWritable hdw = new HiveDecimalWritable(); + hdw.readFields(in); + return hdw.getHiveDecimal(); + case DataType.DATE: + DateWritable dw = new DateWritable(); + dw.readFields(in); + return dw.get(); + case DataType.TIMESTAMP: + TimestampWritable tw = new TimestampWritable(); + tw.readFields(in); + return tw.getTimestamp(); default: throw new IOException("Unexpected data type " + type + " found in stream."); @@ -106,9 +137,9 @@ public static Object readDatum(DataInput in) throws IOException { public static void writeDatum(DataOutput out, Object val) throws IOException { // write the data type byte type = DataType.findType(val); + out.write(type); switch (type) { case DataType.LIST: - out.writeByte(DataType.LIST); List list = (List) val; int sz = list.size(); out.writeInt(sz); @@ -118,7 +149,6 @@ public static void writeDatum(DataOutput out, Object val) throws IOException { return; case DataType.MAP: - out.writeByte(DataType.MAP); Map m = (Map) val; out.writeInt(m.size()); Iterator i = @@ -131,59 +161,64 @@ public static void writeDatum(DataOutput out, Object val) throws IOException { return; case DataType.INTEGER: - out.writeByte(DataType.INTEGER); new VIntWritable((Integer) val).write(out); return; case DataType.LONG: - out.writeByte(DataType.LONG); new VLongWritable((Long) val).write(out); return; case DataType.FLOAT: - out.writeByte(DataType.FLOAT); out.writeFloat((Float) val); return; case DataType.DOUBLE: - out.writeByte(DataType.DOUBLE); out.writeDouble((Double) val); return; case DataType.BOOLEAN: - out.writeByte(DataType.BOOLEAN); out.writeBoolean((Boolean) val); return; case DataType.BYTE: - out.writeByte(DataType.BYTE); out.writeByte((Byte) val); return; case DataType.SHORT: - out.writeByte(DataType.SHORT); out.writeShort((Short) val); return; case DataType.STRING: String s = (String) val; byte[] utfBytes = s.getBytes(ReaderWriter.UTF8); - out.writeByte(DataType.STRING); out.writeInt(utfBytes.length); out.write(utfBytes); return; case DataType.BINARY: byte[] ba = (byte[]) val; - out.writeByte(DataType.BINARY); out.writeInt(ba.length); out.write(ba); return; case DataType.NULL: - out.writeByte(DataType.NULL); + //for NULL we just write out the type + return; + case DataType.CHAR: + new HiveCharWritable((HiveChar)val).write(out); + return; + case DataType.VARCHAR: + new HiveVarcharWritable((HiveVarchar)val).write(out); + return; + case DataType.DECIMAL: + new HiveDecimalWritable((HiveDecimal)val).write(out); + return; + case DataType.DATE: + new DateWritable((Date)val).write(out); + return; + case DataType.TIMESTAMP: + new TimestampWritable((java.sql.Timestamp)val).write(out); return; - default: throw new IOException("Unexpected data type " + type + " found in stream."); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java index 92596e0..f271a61 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java @@ -21,23 +21,86 @@ import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +@InterfaceAudience.Public +@InterfaceStability.Evolving public class HCatFieldSchema implements Serializable { - +/*the implementation of HCatFieldSchema is a bit messy since with the addition of parametrized +types (e.g. char(7)) we need to represent something richer than an enum but for backwards +compatibility (and effort required to do full refactoring) this class has both 'type' and 'typeInfo'; +similarly for mapKeyType/mapKeyTypeInfo */ + public enum Type { - INT, - TINYINT, - SMALLINT, - BIGINT, - BOOLEAN, - FLOAT, - DOUBLE, - STRING, - ARRAY, - MAP, - STRUCT, - BINARY, + /*this captures mapping of Hive type names to HCat type names; in the long run + * we should just use Hive types directly but that is a larger refactoring effort + * For HCat->Pig mapping see PigHCatUtil.getPigType(Type) + * For Pig->HCat mapping see HCatBaseStorer#validateSchema(...)*/ + BOOLEAN(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN), + TINYINT(PrimitiveObjectInspector.PrimitiveCategory.BYTE), + SMALLINT(PrimitiveObjectInspector.PrimitiveCategory.SHORT), + INT(PrimitiveObjectInspector.PrimitiveCategory.INT), + BIGINT(PrimitiveObjectInspector.PrimitiveCategory.LONG), + FLOAT(PrimitiveObjectInspector.PrimitiveCategory.FLOAT), + DOUBLE(PrimitiveObjectInspector.PrimitiveCategory.DOUBLE), + DECIMAL(PrimitiveObjectInspector.PrimitiveCategory.DECIMAL), + STRING(PrimitiveObjectInspector.PrimitiveCategory.STRING), + CHAR(PrimitiveObjectInspector.PrimitiveCategory.CHAR), + VARCHAR(PrimitiveObjectInspector.PrimitiveCategory.VARCHAR), + BINARY(PrimitiveObjectInspector.PrimitiveCategory.BINARY), + DATE(PrimitiveObjectInspector.PrimitiveCategory.DATE), + TIMESTAMP(PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP), + + ARRAY(ObjectInspector.Category.LIST), + MAP(ObjectInspector.Category.MAP), + STRUCT(ObjectInspector.Category.STRUCT); + + + private final ObjectInspector.Category category; + private final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory; + private Type(ObjectInspector.Category cat) { + category = cat; + primitiveCategory = null; + assert category != ObjectInspector.Category.PRIMITIVE : + "This c'tor should be used for complex category types"; + } + private Type(PrimitiveObjectInspector.PrimitiveCategory primCat) { + category = ObjectInspector.Category.PRIMITIVE; + primitiveCategory = primCat; + } + public ObjectInspector.Category getCategory() { + return category; + } + /** + * May return {@code null} + */ + public PrimitiveObjectInspector.PrimitiveCategory getPrimitiveCategory() { + return primitiveCategory; + } + public static Type getPrimitiveHType(PrimitiveTypeInfo basePrimitiveTypeInfo) { + for(Type t : values()) { + if(t.getPrimitiveCategory() == basePrimitiveTypeInfo.getPrimitiveCategory()) { + return t; + } + } + throw new TypeNotPresentException(basePrimitiveTypeInfo.getTypeName(), null); + } + //aid in testing + public static int numPrimitiveTypes() { + int numPrimitives = 0; + for(Type t : values()) { + if(t.category == ObjectInspector.Category.PRIMITIVE) { + numPrimitives++; + } + } + return numPrimitives; + } } public enum Category { @@ -59,10 +122,8 @@ public static Category fromType(Type type) { } } - ; - public boolean isComplex() { - return (category == Category.PRIMITIVE) ? false : true; + return category != Category.PRIMITIVE; } /** @@ -72,6 +133,9 @@ public boolean isComplex() { String fieldName = null; String comment = null; + /** + * @deprecated use {@link #typeInfo} + */ Type type = null; Category category = null; @@ -82,9 +146,18 @@ public boolean isComplex() { HCatSchema subSchema = null; // populated if column is Map type + @Deprecated Type mapKeyType = null; private String typeString = null; + /** + * This is needed for parametrized types such as decimal(8,9), char(7), varchar(6) + */ + private PrimitiveTypeInfo typeInfo; + /** + * represents key type for a Map; currently Hive only supports primitive keys + */ + private PrimitiveTypeInfo mapKeyTypeInfo; @SuppressWarnings("unused") private HCatFieldSchema() { @@ -94,6 +167,7 @@ private HCatFieldSchema() { /** * Returns type of the field * @return type of the field + * @deprecated use {@link #getTypeInfo()} */ public Type getType() { return type; @@ -118,12 +192,18 @@ public String getName() { public String getComment() { return comment; } - + /** + * May return {@code null} + */ + public PrimitiveTypeInfo getTypeInfo() { + return typeInfo; + } /** * Constructor constructing a primitive datatype HCatFieldSchema * @param fieldName Name of the primitive field * @param type Type of the primitive field * @throws HCatException if call made on non-primitive types + * @deprecated as of Hive 0.13; use {@link #HCatFieldSchema(String, org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo, String)} */ public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatException { assertTypeInCategory(type, Category.PRIMITIVE, fieldName); @@ -132,6 +212,17 @@ public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatE this.category = Category.PRIMITIVE; this.comment = comment; } + public HCatFieldSchema(String fieldName, PrimitiveTypeInfo typeInfo, String comment) + throws HCatException { + this.fieldName = fieldName; + //HCatUtil.assertNotNull(fieldName, "fieldName cannot be null");//seems sometimes it can be + // null, for ARRAY types in particular (which may be a complex type) + this.category = Category.PRIMITIVE; + this.typeInfo = typeInfo; + HCatUtil.assertNotNull(typeInfo, "typeInfo cannot be null; fieldName=" + fieldName, null); + type = Type.getPrimitiveHType(typeInfo); + this.comment = comment; + } /** * Constructor for constructing a ARRAY type or STRUCT type HCatFieldSchema, passing type and subschema @@ -164,9 +255,12 @@ private void setName(String name) { * @param mapKeyType - key type of the Map * @param mapValueSchema - subschema of the value of the Map * @throws HCatException if call made on non-Map types + * @deprecated use {@link #createMapTypeFieldSchema(String, org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo, HCatSchema, String)} */ public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException { assertTypeInCategory(type, Category.MAP, fieldName); + //Hive only supports primitive map keys: + //https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes assertTypeInCategory(mapKeyType, Category.PRIMITIVE, fieldName); this.fieldName = fieldName; this.type = Type.MAP; @@ -176,6 +270,16 @@ public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema this.subSchema.get(0).setName(null); this.comment = comment; } + public static HCatFieldSchema createMapTypeFieldSchema(String fieldName, PrimitiveTypeInfo mapKeyType, + HCatSchema mapValueSchema, + String comment) throws HCatException { + HCatFieldSchema mapSchema = new HCatFieldSchema(fieldName, Type.MAP, + Type.getPrimitiveHType(mapKeyType), + mapValueSchema, comment); + mapSchema.mapKeyTypeInfo = mapKeyType; + return mapSchema; + } + public HCatSchema getStructSubSchema() throws HCatException { assertTypeInCategory(this.type, Category.STRUCT, this.fieldName); @@ -186,12 +290,17 @@ public HCatSchema getArrayElementSchema() throws HCatException { assertTypeInCategory(this.type, Category.ARRAY, this.fieldName); return subSchema; } - + /** + * @deprecated use {@link #getMapKeyTypeInfo()} + */ public Type getMapKeyType() throws HCatException { assertTypeInCategory(this.type, Category.MAP, this.fieldName); return mapKeyType; } - + public PrimitiveTypeInfo getMapKeyTypeInfo() throws HCatException { + assertTypeInCategory(this.type, Category.MAP, this.fieldName); + return mapKeyTypeInfo; + } public HCatSchema getMapValueSchema() throws HCatException { assertTypeInCategory(this.type, Category.MAP, this.fieldName); return subSchema; @@ -227,8 +336,8 @@ public String getTypeString() { } StringBuilder sb = new StringBuilder(); - if (Category.PRIMITIVE == category) { - sb.append(type); + if (!isComplex()) { + sb.append(typeInfo == null ? type : typeInfo.getTypeName()); } else if (Category.STRUCT == category) { sb.append("struct<"); sb.append(subSchema.getSchemaAsTypeString()); @@ -239,7 +348,7 @@ public String getTypeString() { sb.append(">"); } else if (Category.MAP == category) { sb.append("map<"); - sb.append(mapKeyType); + sb.append(mapKeyTypeInfo == null ? mapKeyType : mapKeyTypeInfo.getTypeName()); sb.append(","); sb.append(subSchema.getSchemaAsTypeString()); sb.append(">"); diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatSchemaUtils.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatSchemaUtils.java index 0fa7465..16c1604 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatSchemaUtils.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatSchemaUtils.java @@ -79,13 +79,14 @@ public HCatSchema build() throws HCatException { public static class MapBuilder extends HCatSchemaBuilder { - Type keyType = null; + PrimitiveTypeInfo keyType = null; HCatSchema valueSchema = null; @Override public HCatSchema build() throws HCatException { List fslist = new ArrayList(); - fslist.add(new HCatFieldSchema(null, Type.MAP, keyType, valueSchema, null)); + fslist.add(HCatFieldSchema.createMapTypeFieldSchema(null, keyType, valueSchema, null)); + return new HCatSchema(fslist); } @@ -94,7 +95,7 @@ public MapBuilder withValueSchema(HCatSchema valueSchema) { return this; } - public MapBuilder withKeyType(Type keyType) { + public MapBuilder withKeyType(PrimitiveTypeInfo keyType) { this.keyType = keyType; return this; } @@ -118,7 +119,7 @@ private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fie Category typeCategory = fieldTypeInfo.getCategory(); HCatFieldSchema hCatFieldSchema; if (Category.PRIMITIVE == typeCategory) { - hCatFieldSchema = new HCatFieldSchema(fieldName, getPrimitiveHType(fieldTypeInfo), null); + hCatFieldSchema = new HCatFieldSchema(fieldName, (PrimitiveTypeInfo)fieldTypeInfo, null); } else if (Category.STRUCT == typeCategory) { HCatSchema subSchema = constructHCatSchema((StructTypeInfo) fieldTypeInfo); hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.STRUCT, subSchema, null); @@ -126,40 +127,15 @@ private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fie HCatSchema subSchema = getHCatSchema(((ListTypeInfo) fieldTypeInfo).getListElementTypeInfo()); hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.ARRAY, subSchema, null); } else if (Category.MAP == typeCategory) { - HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo) fieldTypeInfo).getMapKeyTypeInfo()); HCatSchema subSchema = getHCatSchema(((MapTypeInfo) fieldTypeInfo).getMapValueTypeInfo()); - hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.MAP, mapKeyType, subSchema, null); + hCatFieldSchema = HCatFieldSchema.createMapTypeFieldSchema(fieldName, + (PrimitiveTypeInfo)((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo(), subSchema, null); } else { throw new TypeNotPresentException(fieldTypeInfo.getTypeName(), null); } return hCatFieldSchema; } - private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { - switch (((PrimitiveTypeInfo) basePrimitiveTypeInfo).getPrimitiveCategory()) { - case BOOLEAN: - return Type.BOOLEAN; - case BYTE: - return Type.TINYINT; - case DOUBLE: - return Type.DOUBLE; - case FLOAT: - return Type.FLOAT; - case INT: - return Type.INT; - case LONG: - return Type.BIGINT; - case SHORT: - return Type.SMALLINT; - case STRING: - return Type.STRING; - case BINARY: - return Type.BINARY; - default: - throw new TypeNotPresentException(((PrimitiveTypeInfo) basePrimitiveTypeInfo).getTypeName(), null); - } - } - public static HCatSchema getHCatSchema(Schema schema) throws HCatException { return getHCatSchema(schema.getFieldSchemas()); } @@ -174,8 +150,8 @@ public static HCatSchema getHCatSchema(List fslist) throw private static HCatSchema constructHCatSchema(StructTypeInfo stypeInfo) throws HCatException { CollectionBuilder builder = getStructSchemaBuilder(); - for (String fieldName : ((StructTypeInfo) stypeInfo).getAllStructFieldNames()) { - builder.addField(getHCatFieldSchema(fieldName, ((StructTypeInfo) stypeInfo).getStructFieldTypeInfo(fieldName))); + for (String fieldName : stypeInfo.getAllStructFieldNames()) { + builder.addField(getHCatFieldSchema(fieldName, stypeInfo.getStructFieldTypeInfo(fieldName))); } return builder.build(); } @@ -184,7 +160,7 @@ public static HCatSchema getHCatSchema(TypeInfo typeInfo) throws HCatException { Category typeCategory = typeInfo.getCategory(); HCatSchema hCatSchema; if (Category.PRIMITIVE == typeCategory) { - hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null, getPrimitiveHType(typeInfo), null)).build(); + hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null, (PrimitiveTypeInfo)typeInfo, null)).build(); } else if (Category.STRUCT == typeCategory) { HCatSchema subSchema = constructHCatSchema((StructTypeInfo) typeInfo); hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null, Type.STRUCT, subSchema, null)).build(); @@ -193,10 +169,10 @@ public static HCatSchema getHCatSchema(TypeInfo typeInfo) throws HCatException { builder.addField(getHCatFieldSchema(null, ((ListTypeInfo) typeInfo).getListElementTypeInfo())); hCatSchema = new HCatSchema(Arrays.asList(new HCatFieldSchema("", Type.ARRAY, builder.build(), ""))); } else if (Category.MAP == typeCategory) { - HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo) typeInfo).getMapKeyTypeInfo()); HCatSchema subSchema = getHCatSchema(((MapTypeInfo) typeInfo).getMapValueTypeInfo()); MapBuilder builder = getMapSchemaBuilder(); - hCatSchema = builder.withKeyType(mapKeyType).withValueSchema(subSchema).build(); + hCatSchema = builder.withKeyType((PrimitiveTypeInfo)((MapTypeInfo) typeInfo).getMapKeyTypeInfo()) + .withValueSchema(subSchema).build(); } else { throw new TypeNotPresentException(typeInfo.getTypeName(), null); } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java index b63bdc2..44ef245 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java @@ -71,10 +71,6 @@ * @param databaseName the db name * @param tableName the table name * @param partitionValues The partition values to publish to, can be null or empty Map to - * work with hadoop security, the kerberos principal name of the server - else null - * The principal name should be of the form: - * /_HOST@ like "hcat/_HOST@myrealm.com" - * The special string _HOST will be replaced automatically with the correct host name * indicate write to a unpartitioned table. For partitioned tables, this map should * contain keys for all partition columns with corresponding values. */ diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/HcatTestUtils.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/HcatTestUtils.java index 8c7c838..c42587e 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/HcatTestUtils.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/HcatTestUtils.java @@ -97,10 +97,4 @@ public static void createTestDataFile(String filename, String[] lines) throws IO } } - /** - * Used by various tests to make sure the path is safe for Windows - */ - public static String makePathASafeFileName(String filePath) { - return new File(filePath).getPath().replaceAll("\\\\", "/"); - } } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java index cde718f..ff56234 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.data; import java.io.IOException; +import java.sql.Date; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -87,14 +88,23 @@ public static void dropTable(Driver driver, String tablename) throws IOException public static boolean recordsEqual(HCatRecord first, HCatRecord second) { - return (compareRecords(first, second) == 0); + return recordsEqual(first, second, null); + } + public static boolean recordsEqual(HCatRecord first, HCatRecord second, + StringBuilder debugDetail) { + return (compareRecords(first, second, debugDetail) == 0); } public static int compareRecords(HCatRecord first, HCatRecord second) { - return compareRecordContents(first.getAll(), second.getAll()); + return compareRecords(first, second, null); + } + public static int compareRecords(HCatRecord first, HCatRecord second, + StringBuilder debugDetail) { + return compareRecordContents(first.getAll(), second.getAll(), debugDetail); } - public static int compareRecordContents(List first, List second) { + public static int compareRecordContents(List first, List second, + StringBuilder debugDetail) { int mySz = first.size(); int urSz = second.size(); if (mySz != urSz) { @@ -103,6 +113,22 @@ public static int compareRecordContents(List first, List second) for (int i = 0; i < first.size(); i++) { int c = DataType.compare(first.get(i), second.get(i)); if (c != 0) { + if(debugDetail != null) { + String msg = "first.get(" + i + "}='" + first.get(i) + "' second.get(" + + i + ")='" + second.get(i) + "' compared as " + c + "\n" + + "Types 1st/2nd=" + DataType.findType(first.get(i)) + "/" +DataType.findType( + second.get(i)) + '\n' + + "first='" + first.get(i) + "' second='" + second.get(i) + "'"; + if(first.get(i) instanceof Date) { + msg += "\n((Date)first.get(i)).getTime()=" + ((Date)first.get(i)).getTime(); + } + if(second.get(i) instanceof Date) { + msg += "\n((Date)second.get(i)).getTime()=" + ((Date)second.get(i)).getTime(); + } + + debugDetail.append(msg); + throw new RuntimeException(debugDetail.toString()); + } return c; } } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestDefaultHCatRecord.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestDefaultHCatRecord.java index fdc4efb..a5ce417 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestDefaultHCatRecord.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestDefaultHCatRecord.java @@ -28,20 +28,32 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.pig.parser.AliasMasker; public class TestDefaultHCatRecord extends TestCase { + /** + * test that we properly serialize/deserialize HCatRecordS + * @throws IOException + */ public void testRYW() throws IOException { File f = new File("binary.dat"); @@ -65,7 +77,9 @@ public void testRYW() throws IOException { for (int i = 0; i < recs.length; i++) { HCatRecord rec = new DefaultHCatRecord(); rec.readFields(inpStream); - Assert.assertTrue(HCatDataCheckUtil.recordsEqual(recs[i], rec)); + StringBuilder msg = new StringBuilder("recs[" + i + "]='" + recs[i] + "' rec='" + rec + "'"); + boolean isEqual = HCatDataCheckUtil.recordsEqual(recs[i], rec, msg); + Assert.assertTrue(msg.toString(), isEqual); } Assert.assertEquals(fInStream.available(), 0); @@ -134,6 +148,21 @@ public void testGetSetByType2() throws HCatException { Assert.assertTrue(HCatDataCheckUtil.recordsEqual(newRec, inpRec)); } + /** + * Test type specific get/set methods on HCatRecord types added in Hive 13 + * @throws HCatException + */ + public void testGetSetByType3() throws HCatException { + HCatRecord inpRec = getHCat13TypesRecord(); + HCatRecord newRec = new DefaultHCatRecord(inpRec.size()); + HCatSchema hsch = HCatSchemaUtils.getHCatSchema( + "a:decimal(5,2),b:char(10),c:varchar(20),d:date,e:timestamp"); + newRec.setDecimal("a", hsch, inpRec.getDecimal("a", hsch)); + newRec.setChar("b", hsch, inpRec.getChar("b", hsch)); + newRec.setVarchar("c", hsch, inpRec.getVarchar("c", hsch)); + newRec.setDate("d", hsch, inpRec.getDate("d", hsch)); + newRec.setTimestamp("e", hsch, inpRec.getTimestamp("e", hsch)); + } private HCatRecord getGetSet2InpRec() { List rlist = new ArrayList(); @@ -238,9 +267,32 @@ private HCatRecord getGetSet2InpRec() { rec_6.add(getList()); HCatRecord tup_6 = new DefaultHCatRecord(rec_6); - - return new HCatRecord[]{tup_1, tup_2, tup_3, tup_4, tup_5, tup_6}; - + return new HCatRecord[]{tup_1, tup_2, tup_3, tup_4, tup_5, tup_6, getHCat13TypesRecord(), + getHCat13TypesComplexRecord()}; + } + private static HCatRecord getHCat13TypesRecord() { + List rec_hcat13types = new ArrayList(5); + rec_hcat13types.add(HiveDecimal.create(new BigDecimal("123.45")));//prec 5, scale 2 + rec_hcat13types.add(new HiveChar("hive_char", 10)); + rec_hcat13types.add(new HiveVarchar("hive_varchar", 20)); + rec_hcat13types.add(Date.valueOf("2014-01-06")); + rec_hcat13types.add(new Timestamp(System.currentTimeMillis())); + return new DefaultHCatRecord(rec_hcat13types); + } + private static HCatRecord getHCat13TypesComplexRecord() { + List rec_hcat13ComplexTypes = new ArrayList(); + Map m = new HashMap(); + m.put(HiveDecimal.create(new BigDecimal("1234.12")), "1234.12"); + m.put(HiveDecimal.create(new BigDecimal("1234.13")), "1234.13"); + rec_hcat13ComplexTypes.add(m); + + Map> m2 = new HashMap>(); + List list = new ArrayList(); + list.add(Date.valueOf("2014-01-05")); + list.add(new HashMap(m)); + m2.put(new Timestamp(System.currentTimeMillis()), list); + rec_hcat13ComplexTypes.add(m2); + return new DefaultHCatRecord(rec_hcat13ComplexTypes); } private Object getList() { diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestJsonSerDe.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestJsonSerDe.java index 0505351..c1d170a 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestJsonSerDe.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestJsonSerDe.java @@ -18,6 +18,9 @@ */ package org.apache.hive.hcatalog.data; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,6 +30,9 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.Text; @@ -85,6 +91,11 @@ c1_1.add(i2); c1.add(c1_1); rlist.add(c1); + rlist.add(HiveDecimal.create(new BigDecimal("123.45")));//prec 5, scale 2 + rlist.add(new HiveChar("hive_char", 10)); + rlist.add(new HiveVarchar("hive_varchar", 20)); + rlist.add(Date.valueOf("2014-01-07")); + rlist.add(new Timestamp(System.currentTimeMillis())); List nlist = new ArrayList(13); nlist.add(null); // tinyint @@ -100,20 +111,26 @@ nlist.add(null); // map nlist.add(null); // bool nlist.add(null); // complex + nlist.add(null); //decimal(5,2) + nlist.add(null); //char(10) + nlist.add(null); //varchar(20) + nlist.add(null); //date + nlist.add(null); //timestamp String typeString = "tinyint,smallint,int,bigint,double,float,string,string," + "struct,array,map,boolean," - + "array,ii2:map>>>>"; + + "array,ii2:map>>>>," + + "decimal(5,2),char(10),varchar(20),date,timestamp"; Properties props = new Properties(); - props.put(serdeConstants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1"); + props.put(serdeConstants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1,bd,hc,hvc,dt,ts"); props.put(serdeConstants.LIST_COLUMN_TYPES, typeString); // props.put(Constants.SERIALIZATION_NULL_FORMAT, "\\N"); // props.put(Constants.SERIALIZATION_FORMAT, "1"); - data.add(new Pair(props, new DefaultHCatRecord(rlist))); - data.add(new Pair(props, new DefaultHCatRecord(nlist))); + data.add(new Pair(props, new DefaultHCatRecord(rlist))); + data.add(new Pair(props, new DefaultHCatRecord(nlist))); return data; } @@ -137,14 +154,17 @@ public void testRW() throws Exception { LOG.info("ONE:{}", s); Object o1 = hrsd.deserialize(s); - assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o1)); + StringBuilder msg = new StringBuilder(); + boolean isEqual = HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o1); + assertTrue(msg.toString(), isEqual); Writable s2 = jsde.serialize(o1, hrsd.getObjectInspector()); LOG.info("TWO:{}", s2); Object o2 = jsde.deserialize(s2); LOG.info("deserialized TWO : {} ", o2); - - assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2)); + msg.setLength(0); + isEqual = HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2, msg); + assertTrue(msg.toString(), isEqual); } } @@ -153,7 +173,7 @@ public void testRobustRead() throws Exception { /** * This test has been added to account for HCATALOG-436 * We write out columns with "internal column names" such - * as "_col0", but try to read with retular column names. + * as "_col0", but try to read with regular column names. */ Configuration conf = new Configuration(); @@ -190,7 +210,9 @@ public void testRobustRead() throws Exception { Object o2 = rjsd.deserialize(s); LOG.info("deserialized TWO : {} ", o2); - assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2)); + StringBuilder msg = new StringBuilder(); + boolean isEqual = HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2, msg); + assertTrue(msg.toString(), isEqual); } } diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchema.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchema.java index 3b5053c..b82fd98 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchema.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/schema/TestHCatSchema.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.data.schema; import junit.framework.TestCase; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hive.hcatalog.common.HCatException; import java.util.ArrayList; @@ -62,6 +63,14 @@ public void testHashCodeEquals() throws HCatException { HCatFieldSchema memberID2 = new HCatFieldSchema("memberID", HCatFieldSchema.Type.INT, "as a number"); assertTrue("Expected objects to be equal", memberID1.equals(memberID2)); assertTrue("Expected hash codes to be equal", memberID1.hashCode() == memberID2.hashCode()); + memberID1 = new HCatFieldSchema("memberID", TypeInfoFactory.getDecimalTypeInfo(5,2), "decimal(5,2)"); + memberID2 = new HCatFieldSchema("memberID", TypeInfoFactory.getDecimalTypeInfo(5,3), "decimal(5)"); + assertFalse("Expected objects to be unequal", memberID1.equals(memberID2)); + assertFalse("Expected hash codes to be unequal", memberID1.hashCode() == memberID2.hashCode()); + memberID1 = new HCatFieldSchema("memberID", TypeInfoFactory.getVarcharTypeInfo(5), "varchar(5)"); + memberID2 = new HCatFieldSchema("memberID", TypeInfoFactory.getVarcharTypeInfo(5), "varchar(5)"); + assertTrue("Expected objects to be equal", memberID1.equals(memberID2)); + assertTrue("Expected hash codes to be equal", memberID1.hashCode() == memberID2.hashCode()); } public void testCannotInstantiateSchemaWithRepeatedFieldNames() throws HCatException { diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java index 0fb845f..f0ed92c 100644 --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java @@ -26,7 +26,9 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -35,6 +37,7 @@ import java.io.File; import java.io.IOException; +import java.util.Properties; /** * Simplify writing HCatalog tests that require a HiveMetaStore. @@ -81,7 +84,26 @@ protected void setUpHiveConf() { } protected void logAndRegister(PigServer server, String query) throws IOException { + logAndRegister(server, query, 1); + } + protected void logAndRegister(PigServer server, String query, int lineNumber) throws IOException { + assert lineNumber > 0 : "(lineNumber > 0) is false"; LOG.info("Registering pig query: " + query); - server.registerQuery(query); + server.registerQuery(query, lineNumber); + } + + /** + * creates PigServer in LOCAL mode. + * http://pig.apache.org/docs/r0.12.0/perf.html#error-handling + * @param stopOnFailure equivalent of "-stop_on_failure" command line arg, setting to 'true' makes + * debugging easier + */ + public static PigServer createPigServer(boolean stopOnFailure) throws ExecException { + if(stopOnFailure) { + Properties p = new Properties(); + p.put("stop.on.failure", Boolean.TRUE.toString()); + return new PigServer(ExecType.LOCAL, p); + } + return new PigServer(ExecType.LOCAL); } } diff --git hcatalog/hcatalog-pig-adapter/pom.xml hcatalog/hcatalog-pig-adapter/pom.xml index dbfb2dc..1cd1982 100644 --- hcatalog/hcatalog-pig-adapter/pom.xml +++ hcatalog/hcatalog-pig-adapter/pom.xml @@ -70,6 +70,13 @@ pig ${pig.version} + + + joda-time + joda-time + 2.2 + @@ -91,6 +98,13 @@ ${pig.version} h2 + + + joda-time + joda-time + 2.2 + diff --git hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java index 06dbe72..ae60030 100644 --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java @@ -20,14 +20,27 @@ package org.apache.hive.hcatalog.pig; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; @@ -52,6 +65,7 @@ import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +89,22 @@ private RecordWriter, HCatRecord> writer; protected HCatSchema computedSchema; protected static final String PIG_SCHEMA = "hcat.pig.store.schema"; + /** + * Controls what happens when incoming Pig value is out-of-range for target Hive column + */ + static final String ON_OOR_VALUE_OPT = "onOutOfRangeValue"; + /** + * prop name in Configuration/context + */ + static final String ON_OORA_VALUE_PROP = "hcat.pig.store.onoutofrangevalue"; + /** + * valid values for ON_OOR_VALUE_OPT + */ + public static enum OOR_VALUE_OPT_VALUES {Null, Throw} protected String sign; + //it's key that this is a per HCatStorer instance object + private final DataLossLogger dataLossLogger = new DataLossLogger(); + private final OOR_VALUE_OPT_VALUES onOutOfRange; public HCatBaseStorer(String partSpecs, String schema) throws Exception { @@ -95,12 +124,15 @@ public HCatBaseStorer(String partSpecs, String schema) throws Exception { } } - if (schema != null) { + if (schema != null && !schema.trim().isEmpty()) { pigSchema = Utils.getSchemaFromString(schema); } - + Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); + onOutOfRange = OOR_VALUE_OPT_VALUES.valueOf(udfProps.getProperty(ON_OORA_VALUE_PROP, getDefaultValue().name())); + } + static OOR_VALUE_OPT_VALUES getDefaultValue() { + return OOR_VALUE_OPT_VALUES.Null; } - @Override public void checkSchema(ResourceSchema resourceSchema) throws IOException { @@ -123,17 +155,26 @@ public void checkSchema(ResourceSchema resourceSchema) throws IOException { * schema of the table in metastore. */ protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException { + if(LOG.isDebugEnabled()) { + LOG.debug("convertPigSchemaToHCatSchema(pigSchema,tblSchema)=(" + pigSchema + "," + tableSchema + ")"); + } List fieldSchemas = new ArrayList(pigSchema.size()); for (FieldSchema fSchema : pigSchema.getFields()) { try { HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema); - - fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema)); + //if writing to a partitioned table, then pigSchema will have more columns than tableSchema + //partition columns are not part of tableSchema... e.g. TestHCatStorer#testPartColsInData() +// HCatUtil.assertNotNull(hcatFieldSchema, "Nothing matching '" + fSchema.alias + "' found " + +// "in target table schema", LOG); + fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema, pigSchema, tableSchema)); } catch (HCatException he) { throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he); } } - return new HCatSchema(fieldSchemas); + + HCatSchema s = new HCatSchema(fieldSchemas); + LOG.debug("convertPigSchemaToHCatSchema(computed)=(" + s + ")"); + return s; } public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException { @@ -147,42 +188,60 @@ public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldS } return false; } - - - private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException { + /** + * Here we are processing HCat table schema as derived from metastore, + * thus it should have information about all fields/sub-fields, but not for partition columns + */ + private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema, + Schema pigSchema, HCatSchema tableSchema) + throws FrontendException, HCatException { + if(hcatFieldSchema == null) { + if(LOG.isDebugEnabled()) { + LOG.debug("hcatFieldSchema is null for fSchema '" + fSchema.alias + "'"); + //throw new IllegalArgumentException("hcatFiledSchema is null; fSchema=" + fSchema + " " + + // "(pigSchema, tableSchema)=(" + pigSchema + "," + tableSchema + ")"); + } + } byte type = fSchema.type; switch (type) { case DataType.CHARARRAY: case DataType.BIGCHARARRAY: - return new HCatFieldSchema(fSchema.alias, Type.STRING, null); - + if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + } + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, null); case DataType.INTEGER: if (hcatFieldSchema != null) { if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) { throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE); } - return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null); - } else { - return new HCatFieldSchema(fSchema.alias, Type.INT, null); + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); } - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.intTypeInfo, null); case DataType.LONG: - return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null); - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.longTypeInfo, null); case DataType.FLOAT: - return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null); - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.floatTypeInfo, null); case DataType.DOUBLE: - return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null); - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.doubleTypeInfo, null); case DataType.BYTEARRAY: - return new HCatFieldSchema(fSchema.alias, Type.BINARY, null); - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.binaryTypeInfo, null); case DataType.BOOLEAN: - return new HCatFieldSchema(fSchema.alias, Type.BOOLEAN, null); - + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.booleanTypeInfo, null); + case DataType.DATETIME: + //Pig DATETIME can map to DATE or TIMESTAMP (see HCatBaseStorer#validateSchema()) which + //is controlled by Hive target table information + if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + } + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.timestampTypeInfo, null); + case DataType.BIGDECIMAL: + if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) { + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null); + } + return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.decimalTypeInfo, null); case DataType.BAG: Schema bagSchema = fSchema.schema; List arrFields = new ArrayList(1); @@ -193,21 +252,18 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema } else { field = bagSchema.getField(0); } - arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0))); + arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema + .getArrayElementSchema().get(0), pigSchema, tableSchema)); return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), ""); - case DataType.TUPLE: - List fieldNames = new ArrayList(); List hcatFSs = new ArrayList(); HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema(); List fields = fSchema.schema.getFields(); for (int i = 0; i < fields.size(); i++) { FieldSchema fieldSchema = fields.get(i); - fieldNames.add(fieldSchema.alias); - hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i))); + hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i), pigSchema, tableSchema)); } return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), ""); - case DataType.MAP: { // Pig's schema contain no type information about map's keys and // values. So, if its a new column assume if its existing @@ -217,15 +273,18 @@ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema List valFSList = new ArrayList(1); if (hcatFieldSchema != null) { - return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), ""); + return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, hcatFieldSchema.getMapKeyTypeInfo(), + hcatFieldSchema.getMapValueSchema(), ""); } // Column not found in target table. Its a new column. Its schema is map - valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, ""); + valFS = new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, ""); valFSList.add(valFS); - return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), ""); + return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, + TypeInfoFactory.stringTypeInfo, new HCatSchema(valFSList), ""); } - + case DataType.BIGINTEGER: + //fall through; doesn't map to Hive/Hcat type; here for completeness default: throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE); } @@ -253,24 +312,22 @@ public void putNext(Tuple tuple) throws IOException { } } + /** + * Convert from Pig value object to Hive value object + * This method assumes that {@link #validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)} + * which checks the types in Pig schema are compatible with target Hive table, has been called. + */ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException { try { - + if(pigObj == null) return null; // The real work-horse. Spend time and energy in this method if there is // need to keep HCatStorer lean and go fast. Type type = hcatFS.getType(); switch (type) { - case BINARY: - if (pigObj == null) { - return null; - } return ((DataByteArray) pigObj).get(); case STRUCT: - if (pigObj == null) { - return null; - } HCatSchema structSubSchema = hcatFS.getStructSubSchema(); // Unwrap the tuple. List all = ((Tuple) pigObj).getAll(); @@ -281,9 +338,6 @@ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatExce return converted; case ARRAY: - if (pigObj == null) { - return null; - } // Unwrap the bag. DataBag pigBag = (DataBag) pigObj; HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0); @@ -298,9 +352,6 @@ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatExce } return bagContents; case MAP: - if (pigObj == null) { - return null; - } Map pigMap = (Map) pigObj; Map typeMap = new HashMap(); for (Entry entry : pigMap.entrySet()) { @@ -318,29 +369,18 @@ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatExce case DOUBLE: return pigObj; case SMALLINT: - if (pigObj == null) { - return null; - } if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) { - throw new BackendException("Value " + pigObj + " is outside the bounds of column " + - hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + handleOutOfRangeValue(pigObj, hcatFS); + return null; } return ((Integer) pigObj).shortValue(); case TINYINT: - if (pigObj == null) { - return null; - } if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) { - throw new BackendException("Value " + pigObj + " is outside the bounds of column " + - hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + handleOutOfRangeValue(pigObj, hcatFS); + return null; } return ((Integer) pigObj).byteValue(); case BOOLEAN: - if (pigObj == null) { - LOG.debug( "HCatBaseStorer.getJavaObj(BOOLEAN): obj null, bailing early" ); - return null; - } - if( pigObj instanceof String ) { if( ((String)pigObj).trim().compareTo("0") == 0 ) { return Boolean.FALSE; @@ -348,24 +388,86 @@ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatExce if( ((String)pigObj).trim().compareTo("1") == 0 ) { return Boolean.TRUE; } - - throw new BackendException( - "Unexpected type " + type + " for value " + pigObj - + (pigObj == null ? "" : " of class " - + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE); + throw new BackendException("Unexpected type " + type + " for value " + pigObj + + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE); } - return Boolean.parseBoolean( pigObj.toString() ); + case DECIMAL: + BigDecimal bd = (BigDecimal)pigObj; + DecimalTypeInfo dti = (DecimalTypeInfo)hcatFS.getTypeInfo(); + if(bd.precision() > dti.precision() || bd.scale() > dti.scale()) { + handleOutOfRangeValue(pigObj, hcatFS); + return null; + } + return HiveDecimal.create(bd); + case CHAR: + String charVal = (String)pigObj; + CharTypeInfo cti = (CharTypeInfo)hcatFS.getTypeInfo(); + if(charVal.length() > cti.getLength()) { + handleOutOfRangeValue(pigObj, hcatFS); + return null; + } + return new HiveChar(charVal, cti.getLength()); + case VARCHAR: + String varcharVal = (String)pigObj; + VarcharTypeInfo vti = (VarcharTypeInfo)hcatFS.getTypeInfo(); + if(varcharVal.length() > vti.getLength()) { + handleOutOfRangeValue(pigObj, hcatFS); + return null; + } + return new HiveVarchar(varcharVal, vti.getLength()); + case TIMESTAMP: + DateTime dt = (DateTime)pigObj; + return new Timestamp(dt.getMillis());//getMillis() returns UTC time regardless of TZ + case DATE: + /** + * We ignore any TZ setting on Pig value since java.sql.Date doesn't have it (in any + * meaningful way). So the assumption is that if Pig value has 0 time component (midnight) + * we assume it reasonably 'fits' into a Hive DATE. If time part is not 0, it's considered + * out of range for target type. + */ + DateTime dateTime = ((DateTime)pigObj); + if(dateTime.getMillisOfDay() != 0) { + handleOutOfRangeValue(pigObj, hcatFS, "Time component must be 0 (midnight) in local timezone; Local TZ val='" + pigObj + "'"); + return null; + } + /*java.sql.Date is a poorly defined API. Some (all?) SerDes call toString() on it + [e.g. LazySimpleSerDe, uses LazyUtils.writePrimitiveUTF8()], which automatically adjusts + for local timezone. Date.valueOf() also uses local timezone (as does Date(int,int,int). + Also see PigHCatUtil#extractPigObject() for corresponding read op. This way a DATETIME from Pig, + when stored into Hive and read back comes back with the same value.*/ + return new Date(dateTime.getYear() - 1900, dateTime.getMonthOfYear() - 1, dateTime.getDayOfMonth()); default: - throw new BackendException("Unexpected type " + type + " for value " + pigObj - + (pigObj == null ? "" : " of class " - + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE); + throw new BackendException("Unexpected HCat type " + type + " for value " + pigObj + + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE); } } catch (BackendException e) { // provide the path to the field in the error message throw new BackendException( - (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(), - e.getCause() == null ? e : e.getCause()); + (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(), e); + } + } + + private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS) throws BackendException { + handleOutOfRangeValue(pigObj, hcatFS, null); + } + /** + * depending on user config, throws an exception or logs a msg if the incoming Pig value is + * out-of-range for target type. + * @param additionalMsg may be {@code null} + */ + private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS, String additionalMsg) throws BackendException { + String msg = "Pig value '" + pigObj + "' is outside the bounds of column " + hcatFS.getName() + + " with type " + (hcatFS.getTypeInfo() == null ? hcatFS.getType() : hcatFS.getTypeInfo().getTypeName()) + + (additionalMsg == null ? "" : "[" + additionalMsg + "]"); + switch (onOutOfRange) { + case Throw: + throw new BackendException(msg, PigHCatUtil.PIG_EXCEPTION_CODE); + case Null: + dataLossLogger.logDataLossMsg(hcatFS, pigObj, msg); + break; + default: + throw new BackendException("Unexpected " + ON_OOR_VALUE_OPT + " value: '" + onOutOfRange + "'"); } } @@ -387,10 +489,10 @@ protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throw // Iterate through all the elements in Pig Schema and do validations as // dictated by semantics, consult HCatSchema of table when need be. - + int columnPos = 0;//helps with debug messages for (FieldSchema pigField : pigSchema.getFields()) { HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema); - validateSchema(pigField, hcatField); + validateSchema(pigField, hcatField, pigSchema, tblSchema, columnPos++); } try { @@ -400,8 +502,14 @@ protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throw } } - - private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField) + /** + * This method encodes which Pig type can map (be stored in) to which HCat type. + * @throws HCatException + * @throws FrontendException + */ + private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField, + Schema topLevelPigSchema, HCatSchema topLevelHCatSchema, + int columnPos) throws HCatException, FrontendException { validateAlias(pigField.alias); byte type = pigField.type; @@ -420,14 +528,16 @@ private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField) case DataType.BAG: HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema(); for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) { - validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema)); + validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema), + topLevelPigSchema, topLevelHCatSchema, columnPos); } break; case DataType.TUPLE: HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema(); for (FieldSchema innerField : pigField.schema.getFields()) { - validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema)); + validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema), + topLevelPigSchema, topLevelHCatSchema, columnPos); } break; @@ -435,6 +545,66 @@ private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField) throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); } } + else if(hcatField != null) { + //there is no point trying to validate further if we have no type info about target field + switch (type) { + case DataType.BIGDECIMAL: + throwTypeMismatchException(type, Lists.newArrayList(Type.DECIMAL), hcatField, columnPos); + break; + case DataType.DATETIME: + throwTypeMismatchException(type, Lists.newArrayList(Type.TIMESTAMP, Type.DATE), hcatField, columnPos); + break; + case DataType.BYTEARRAY: + throwTypeMismatchException(type, Lists.newArrayList(Type.BINARY), hcatField, columnPos); + break; + case DataType.BIGINTEGER: + throwTypeMismatchException(type, Collections.emptyList(), hcatField, columnPos); + break; + case DataType.BOOLEAN: + throwTypeMismatchException(type, Lists.newArrayList(Type.BOOLEAN), hcatField, columnPos); + break; + case DataType.CHARARRAY: + throwTypeMismatchException(type, Lists.newArrayList(Type.STRING, Type.CHAR, Type.VARCHAR), + hcatField, columnPos); + break; + case DataType.DOUBLE: + throwTypeMismatchException(type, Lists.newArrayList(Type.DOUBLE), hcatField, columnPos); + break; + case DataType.FLOAT: + throwTypeMismatchException(type, Lists.newArrayList(Type.FLOAT), hcatField, columnPos); + break; + case DataType.INTEGER: + throwTypeMismatchException(type, Lists.newArrayList(Type.INT, Type.BIGINT, + Type.TINYINT, Type.SMALLINT), hcatField, columnPos); + break; + case DataType.LONG: + throwTypeMismatchException(type, Lists.newArrayList(Type.BIGINT), hcatField, columnPos); + break; + default: + throw new FrontendException("'" + type + + "' Pig datatype in column " + columnPos + "(0-based) is not supported by HCat", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + else { + if(false) { + //see HIVE-6194 + throw new FrontendException("(pigSch,hcatSchema)=(" + pigField + "," + + "" + hcatField + ") (topPig, topHcat)=(" + topLevelPigSchema + "," + + "" + topLevelHCatSchema + ")"); + } + } + } + private static void throwTypeMismatchException(byte pigDataType, + List hcatRequiredType, HCatFieldSchema hcatActualField, + int columnPos) throws FrontendException { + if(!hcatRequiredType.contains(hcatActualField.getType())) { + throw new FrontendException( + "Pig '" + DataType.findTypeName(pigDataType) + "' type in column " + + columnPos + "(0-based) cannot map to HCat '" + + hcatActualField.getType() + "'type. Target filed must be of HCat type {" + + StringUtils.join(hcatRequiredType, " or ") + "}"); + } } private void validateAlias(String alias) throws FrontendException { @@ -467,4 +637,23 @@ public void cleanupOnFailure(String location, Job job) throws IOException { @Override public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException { } + + /** + * todo: when job is complete, should print the msgCount table to log + */ + private static final class DataLossLogger { + private static final Map msgCount = new HashMap(); + private static String getColumnTypeKey(HCatFieldSchema fieldSchema) { + return fieldSchema.getName() + "_" + (fieldSchema.getTypeInfo() == null ? + fieldSchema.getType() : fieldSchema.getTypeInfo()); + } + private void logDataLossMsg(HCatFieldSchema fieldSchema, Object pigOjb, String msg) { + String key = getColumnTypeKey(fieldSchema); + if(!msgCount.containsKey(key)) { + msgCount.put(key, 0); + LOG.warn(msg + " " + "Will write NULL instead. Only 1 such message per type/column is emitted."); + } + msgCount.put(key, msgCount.get(key) + 1); + } + } } diff --git hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java index 5dcce46..1ebb2a2 100644 --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java @@ -47,6 +47,8 @@ import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.impl.util.UDFContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Pig {@link org.apache.pig.LoadFunc} to read data from HCat @@ -54,6 +56,7 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class HCatLoader extends HCatBaseLoader { + private static final Logger LOG = LoggerFactory.getLogger(HCatLoader.class); private static final String PARTITION_FILTER = "partition.filter"; // for future use @@ -171,6 +174,9 @@ public void setLocation(String location, Job job) throws IOException { } } } + if(LOG.isDebugEnabled()) { + LOG.debug("outputSchema=" + outputSchema); + } } diff --git hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java index acc62e8..df85954 100644 --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java @@ -26,6 +26,12 @@ import java.util.Map.Entry; import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; @@ -45,6 +51,8 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HCatStorer. @@ -53,6 +61,7 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class HCatStorer extends HCatBaseStorer { + private static final Logger LOG = LoggerFactory.getLogger(HCatStorer.class); // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize final public static String INNER_SIGNATURE = "hcatstorer.inner.signature"; @@ -60,18 +69,50 @@ // A hash map which stores job credentials. The key is a signature passed by Pig, which is //unique to the store func and out file name (table, in our case). private static Map jobCredentials = new HashMap(); - - - public HCatStorer(String partSpecs, String schema) throws Exception { - super(partSpecs, schema); + private final static Options validOptions = new Options(); + static { + try { + populateValidOptions(); + } + catch(Throwable t) { + LOG.error("Failed to build option list: ", t); + throw new RuntimeException(t); + } + } + private final static CommandLineParser parser = new GnuParser(); + + /** + * @param optString may empty str (not null), in which case it's no-op + */ + public HCatStorer(String partSpecs, String pigSchema, String optString) throws Exception { + super(partSpecs, pigSchema); + String[] optsArr = optString.split(" "); + CommandLine configuredOptions; + try { + configuredOptions = parser.parse(validOptions, optsArr); + } catch (ParseException e) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( "[-" + ON_OOR_VALUE_OPT + "]", validOptions ); + throw e; + } + Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); + //'Throw' is the default for backwards compatibility + //downstream code expects it to be set to a valid value + udfProps.put(ON_OORA_VALUE_PROP, configuredOptions.getOptionValue(ON_OOR_VALUE_OPT, getDefaultValue().name())); + if(LOG.isDebugEnabled()) { + LOG.debug("setting " + configuredOptions.getOptionValue(ON_OOR_VALUE_OPT)); + } + isValidOOROption((String)udfProps.get(ON_OORA_VALUE_PROP)); + } + public HCatStorer(String partSpecs, String pigSchema) throws Exception { + this(partSpecs, pigSchema, ""); } - public HCatStorer(String partSpecs) throws Exception { - this(partSpecs, null); + this(partSpecs, null, ""); } public HCatStorer() throws Exception { - this(null, null); + this(null, null, ""); } @Override @@ -79,6 +120,33 @@ public OutputFormat getOutputFormat() throws IOException { return new HCatOutputFormat(); } + /** + * makes a list of all options that HCatStorer understands + */ + private static void populateValidOptions() { + validOptions.addOption(ON_OOR_VALUE_OPT, true, + "Controls how store operation handles Pig values which are out of range for the target column" + + "in Hive table. Default is to throw an exception."); + } + /** + * check that onOutOfRangeValue handling is configured properly + * @throws FrontendException + */ + private static void isValidOOROption(String optVal) throws FrontendException { + boolean found = false; + for(OOR_VALUE_OPT_VALUES v : OOR_VALUE_OPT_VALUES.values()) { + if(v.name().equalsIgnoreCase(optVal)) { + found = true; + break; + } + } + if(!found) { + throw new FrontendException("Unexpected value for '" + ON_OOR_VALUE_OPT + "' found: " + optVal); + } + } + /** + * @param location databaseName.tableName + */ @Override public void setStoreLocation(String location, Job job) throws IOException { HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get() diff --git hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java index ed8d7c3..c061146 100644 --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java @@ -20,8 +20,11 @@ import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,6 +32,9 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -56,6 +62,8 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -206,7 +214,7 @@ public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOE rfSchemaList.add(rfSchema); } ResourceSchema rSchema = new ResourceSchema(); - rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0])); + rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[rfSchemaList.size()])); return rSchema; } @@ -266,7 +274,7 @@ protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOEx } else if (arrayElementFieldSchema.getType() == Type.ARRAY) { ResourceSchema s = new ResourceSchema(); List lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema)); - s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); + s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()])); bagSubFieldSchemas[0].setSchema(s); } else { ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1]; @@ -276,8 +284,7 @@ protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOEx .setSchema(null); // the element type is not a tuple - so no subschema bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas)); } - ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas); - return s; + return new ResourceSchema().setFields(bagSubFieldSchemas); } @@ -288,7 +295,7 @@ private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOEx for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) { lrfs.add(getResourceSchemaFromFieldSchema(subField)); } - s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); + s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()])); return s; } @@ -300,9 +307,16 @@ private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOEx static public byte getPigType(HCatFieldSchema hfs) throws IOException { return getPigType(hfs.getType()); } - + /** + * Defines a mapping of HCatalog type to Pig type; not every mapping is exact, + * see {@link #extractPigObject(Object, org.apache.hive.hcatalog.data.schema.HCatFieldSchema)} + * See http://pig.apache.org/docs/r0.12.0/basic.html#data-types + * See {@link org.apache.hive.hcatalog.pig.HCatBaseStorer#validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)} + * for Pig->Hive type mapping. + */ static public byte getPigType(Type type) throws IOException { - if (type == Type.STRING) { + if (type == Type.STRING || type == Type.CHAR || type == Type.VARCHAR) { + //CHARARRAY is unbounded so Hive->Pig is lossless return DataType.CHARARRAY; } @@ -341,6 +355,14 @@ static public byte getPigType(Type type) throws IOException { if (type == Type.BOOLEAN && pigHasBooleanSupport) { return DataType.BOOLEAN; } + if(type == Type.DECIMAL) { + //Hive is more restrictive, so Hive->Pig works + return DataType.BIGDECIMAL; + } + if(type == Type.DATE || type == Type.TIMESTAMP) { + //Hive Date is representable as Pig DATETIME + return DataType.DATETIME; + } throw new PigException("HCatalog column type '" + type.toString() + "' is not supported in Pig as a column type", PIG_EXCEPTION_CODE); @@ -353,22 +375,54 @@ public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Except return transformToTuple(hr.getAll(), hs); } - @SuppressWarnings("unchecked") + /** + * Converts object from Hive's value system to Pig's value system + * see HCatBaseStorer#getJavaObj() for Pig->Hive conversion + * @param o object from Hive value system + * @return object in Pig value system + */ public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception { + if(o == null) { + return null; + } Object result; Type itemType = hfs.getType(); switch (itemType) { case BINARY: - result = (o == null) ? null : new DataByteArray((byte[]) o); + result = new DataByteArray((byte[]) o); break; case STRUCT: - result = transformToTuple((List) o, hfs); + result = transformToTuple((List) o, hfs); break; case ARRAY: - result = transformToBag((List) o, hfs); + result = transformToBag((List) o, hfs); break; case MAP: - result = transformToPigMap((Map) o, hfs); + result = transformToPigMap((Map) o, hfs); + break; + case DECIMAL: + result = ((HiveDecimal)o).bigDecimalValue(); + break; + case CHAR: + result = ((HiveChar)o).getValue(); + break; + case VARCHAR: + result = ((HiveVarchar)o).getValue(); + break; + case DATE: + /*java.sql.Date is weird. It automatically adjusts it's millis value to be in the local TZ + * e.g. d = new java.sql.Date(System.currentMillis()).toString() so if you do this just after + * midnight in Palo Alto, you'll get yesterday's date printed out.*/ + Date d = (Date)o; + result = new DateTime(d.getYear() + 1900, d.getMonth() + 1, d.getDate(), 0, 0);//uses local TZ + break; + case TIMESTAMP: + /*DATA TRUNCATION!!! + Timestamp may have nanos; we'll strip those away and create a Joda DateTime + object in local TZ; This is arbitrary, since Hive value doesn't have any TZ notion, but + we need to set something for TZ. + Timestamp is consistently in GMT (unless you call toString() on it) so we use millis*/ + result = new DateTime(((Timestamp)o).getTime());//uses local TZ break; default: result = o; @@ -377,7 +431,7 @@ public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exce return result; } - private static Tuple transformToTuple(List objList, HCatFieldSchema hfs) throws Exception { + private static Tuple transformToTuple(List objList, HCatFieldSchema hfs) throws Exception { try { return transformToTuple(objList, hfs.getStructSubSchema()); } catch (Exception e) { @@ -389,7 +443,7 @@ private static Tuple transformToTuple(List objList, HCatFieldS } } - private static Tuple transformToTuple(List objList, HCatSchema hs) throws Exception { + private static Tuple transformToTuple(List objList, HCatSchema hs) throws Exception { if (objList == null) { return null; } @@ -401,21 +455,20 @@ private static Tuple transformToTuple(List objList, HCatSchema return t; } - private static Map transformToPigMap(Map map, HCatFieldSchema hfs) throws Exception { + private static Map transformToPigMap(Map map, HCatFieldSchema hfs) throws Exception { if (map == null) { return null; } Map result = new HashMap(); - for (Entry entry : map.entrySet()) { + for (Entry entry : map.entrySet()) { // since map key for Pig has to be Strings result.put(entry.getKey().toString(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0))); } return result; } - @SuppressWarnings("unchecked") - private static DataBag transformToBag(List list, HCatFieldSchema hfs) throws Exception { + private static DataBag transformToBag(List list, HCatFieldSchema hfs) throws Exception { if (list == null) { return null; } @@ -425,7 +478,7 @@ private static DataBag transformToBag(List list, HCatFieldSche for (Object o : list) { Tuple tuple; if (elementSubFieldSchema.getType() == Type.STRUCT) { - tuple = transformToTuple((List) o, elementSubFieldSchema); + tuple = transformToTuple((List) o, elementSubFieldSchema); } else { // bags always contain tuples tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema)); diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 209c6c5..6344bbf 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -18,14 +18,11 @@ */ package org.apache.hive.hcatalog.pig; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.sql.Date; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -40,12 +37,14 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.Pair; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.ResourceStatistics; @@ -53,11 +52,17 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; public class TestHCatLoader { + private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoader.class); private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") + File.separator + TestHCatLoader.class.getCanonicalName() + "-" + System.currentTimeMillis()); private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; @@ -78,26 +83,46 @@ protected String storageFormat() { } private void dropTable(String tablename) throws IOException, CommandNeedRetryException { + dropTable(tablename, driver); + } + static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException { driver.run("drop table " + tablename); } private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException { + createTable(tablename, schema, partitionedBy, driver, storageFormat()); + } + static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat) + throws IOException, CommandNeedRetryException { String createTable; createTable = "create table " + tablename + "(" + schema + ") "; if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } - createTable = createTable + "stored as " +storageFormat(); - int retCode = driver.run(createTable).getResponseCode(); - if (retCode != 0) { - throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]"); - } + createTable = createTable + "stored as " +storageFormat; + executeStatementOnDriver(createTable, driver); } private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException { createTable(tablename, schema, null); } - + /** + * Execute Hive CLI statement + * @param cmd arbitrary statement to execute + */ + static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException { + LOG.debug("Executing: " + cmd); + CommandProcessorResponse cpr = driver.run(cmd); + if(cpr.getResponseCode() != 0) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage()); + } + } + private static void checkProjection(FieldSchema fs, String expectedName, byte expectedPigType) { + assertEquals(fs.alias, expectedName); + assertEquals("Expected " + DataType.findTypeName(expectedPigType) + "; got " + + DataType.findTypeName(fs.type), expectedPigType, fs.type); + } + @Before public void setup() throws Exception { @@ -127,6 +152,7 @@ public void setup() throws Exception { createTable(PARTITIONED_TABLE, "a int, b string", "bkt string"); createTable(SPECIFIC_SIZE_TABLE, "a int, b string"); + AllTypesTable.setupAllTypesTable(driver); int LOOP_SIZE = 3; String[] input = new String[LOOP_SIZE * LOOP_SIZE]; @@ -148,23 +174,23 @@ public void setup() throws Exception { //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}", } ); - PigServer server = new PigServer(ExecType.LOCAL); server.setBatchOn(); - server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);"); - - server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();"); - server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();"); - server.registerQuery("B = foreach A generate a,b;"); - server.registerQuery("B2 = filter B by a < 2;"); - server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');"); - - server.registerQuery("C = foreach A generate a,b;"); - server.registerQuery("C2 = filter C by a >= 2;"); - server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');"); - - server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});"); - server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();"); + int i = 0; + server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i); + + server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i); + server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i); + server.registerQuery("B = foreach A generate a,b;", ++i); + server.registerQuery("B2 = filter B by a < 2;", ++i); + server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i); + + server.registerQuery("C = foreach A generate a,b;", ++i); + server.registerQuery("C2 = filter C by a >= 2;", ++i); + server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i); + + server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});", ++i); + server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i); server.executeBatch(); } @@ -176,6 +202,7 @@ public void tearDown() throws Exception { dropTable(COMPLEX_TABLE); dropTable(PARTITIONED_TABLE); dropTable(SPECIFIC_SIZE_TABLE); + dropTable(AllTypesTable.ALL_PRIMITIVE_TYPES_TABLE); } finally { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } @@ -197,6 +224,20 @@ public void testSchemaLoadBasic() throws IOException { assertTrue(Xfields.get(1).type == DataType.CHARARRAY); } + /** + * Test that we properly translate data types in Hive/HCat table schema into Pig schema + */ + @Test + public void testSchemaLoadPrimitiveTypes() throws IOException { + AllTypesTable.testSchemaLoadPrimitiveTypes(); + } + /** + * Test that value from Hive table are read properly in Pig + */ + @Test + public void testReadDataPrimitiveTypes() throws Exception { + AllTypesTable.testReadDataPrimitiveTypes(); + } @Test public void testReadDataBasic() throws IOException { @@ -450,4 +491,114 @@ public void testConvertBooleanToInt() throws Exception { assertEquals(0, t.get(1)); assertFalse(iterator.hasNext()); } + + /** + * basic tests that cover each scalar type + * https://issues.apache.org/jira/browse/HIVE-5814 + */ + private static final class AllTypesTable { + private static final String ALL_TYPES_FILE_NAME = TEST_DATA_DIR + "/alltypes.input.data"; + private static final String ALL_PRIMITIVE_TYPES_TABLE = "junit_unparted_alltypes"; + private static final String ALL_TYPES_SCHEMA = "( c_boolean boolean, " + //0 + "c_tinyint tinyint, " + //1 + "c_smallint smallint, " + //2 + "c_int int, " + //3 + "c_bigint bigint, " + //4 + "c_float float, " + //5 + "c_double double, " + //6 + "c_decimal decimal(5,2), " +//7 + "c_string string, " + //8 + "c_char char(10), " + //9 + "c_varchar varchar(20), " + //10 + "c_binary binary, " + //11 + "c_date date, " + //12 + "c_timestamp timestamp)"; //13 + /** + * raw data for #ALL_PRIMITIVE_TYPES_TABLE + * All the values are within range of target data type (column) + */ + private static final Object[][] primitiveRows = new Object[][] { + {Boolean.TRUE,Byte.MAX_VALUE,Short.MAX_VALUE, Integer.MAX_VALUE,Long.MAX_VALUE,Float.MAX_VALUE,Double.MAX_VALUE,555.22,"Kyiv","char(10)xx","varchar(20)","blah".getBytes(),Date.valueOf("2014-01-13"),Timestamp.valueOf("2014-01-13 19:26:25.0123")}, + {Boolean.FALSE,Byte.MIN_VALUE,Short.MIN_VALUE, Integer.MIN_VALUE,Long.MIN_VALUE,Float.MIN_VALUE,Double.MIN_VALUE,-555.22,"Saint Petersburg","char(xx)00","varchar(yy)","doh".getBytes(),Date.valueOf("2014-01-14"), Timestamp.valueOf("2014-01-14 19:26:25.0123")} + }; + /** + * Test that we properly translate data types in Hive/HCat table schema into Pig schema + */ + private static void testSchemaLoadPrimitiveTypes() throws IOException { + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();"); + Schema dumpedXSchema = server.dumpSchema("X"); + List Xfields = dumpedXSchema.getFields(); + assertEquals("Expected " + HCatFieldSchema.Type.numPrimitiveTypes() + " fields, found " + + Xfields.size(), HCatFieldSchema.Type.numPrimitiveTypes(), Xfields.size()); + checkProjection(Xfields.get(0), "c_boolean", DataType.BOOLEAN); + checkProjection(Xfields.get(1), "c_tinyint", DataType.INTEGER); + checkProjection(Xfields.get(2), "c_smallint", DataType.INTEGER); + checkProjection(Xfields.get(3), "c_int", DataType.INTEGER); + checkProjection(Xfields.get(4), "c_bigint", DataType.LONG); + checkProjection(Xfields.get(5), "c_float", DataType.FLOAT); + checkProjection(Xfields.get(6), "c_double", DataType.DOUBLE); + checkProjection(Xfields.get(7), "c_decimal", DataType.BIGDECIMAL); + checkProjection(Xfields.get(8), "c_string", DataType.CHARARRAY); + checkProjection(Xfields.get(9), "c_char", DataType.CHARARRAY); + checkProjection(Xfields.get(10), "c_varchar", DataType.CHARARRAY); + checkProjection(Xfields.get(11), "c_binary", DataType.BYTEARRAY); + checkProjection(Xfields.get(12), "c_date", DataType.DATETIME); + checkProjection(Xfields.get(13), "c_timestamp", DataType.DATETIME); + } + /** + * Test that value from Hive table are read properly in Pig + */ + private static void testReadDataPrimitiveTypes() throws Exception { + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();"); + Iterator XIter = server.openIterator("X"); + int numTuplesRead = 0; + while (XIter.hasNext()) { + Tuple t = XIter.next(); + assertEquals(HCatFieldSchema.Type.numPrimitiveTypes(), t.size()); + int colPos = 0; + for(Object referenceData : primitiveRows[numTuplesRead]) { + if(referenceData == null) { + assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data is null; actual " + + t.get(colPos), t.get(colPos) == null); + } + else if(referenceData instanceof java.util.Date) { + assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + ((java.util.Date)referenceData).getTime() + " actual=" + + ((DateTime)t.get(colPos)).getMillis() + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")", + ((java.util.Date)referenceData).getTime()== ((DateTime)t.get(colPos)).getMillis()); + //note that here we ignore nanos part of Hive Timestamp since nanos are dropped when reading Hive from Pig by design + } + else { + assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + referenceData + " actual=" + + t.get(colPos) + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")", + referenceData.toString().equals(t.get(colPos).toString())); + //doing String comps here as value objects in Hive in Pig are different so equals() doesn't work + } + colPos++; + } + numTuplesRead++; + } + assertTrue("Expected " + primitiveRows.length + "; found " + numTuplesRead, numTuplesRead == primitiveRows.length); + } + private static void setupAllTypesTable(Driver driver) throws Exception { + String[] primitiveData = new String[primitiveRows.length]; + for(int i = 0; i < primitiveRows.length; i++) { + Object[] rowData = primitiveRows[i]; + StringBuilder row = new StringBuilder(); + for(Object cell : rowData) { + row.append(row.length() == 0 ? "" : "\t").append(cell == null ? null : cell); + } + primitiveData[i] = row.toString(); + } + HcatTestUtils.createTestDataFile(ALL_TYPES_FILE_NAME, primitiveData); + String cmd = "create table " + ALL_PRIMITIVE_TYPES_TABLE + ALL_TYPES_SCHEMA + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'" + + " STORED AS TEXTFILE"; + executeStatementOnDriver(cmd, driver); + cmd = "load data local inpath '" + HCatUtil.makePathASafeFileName(ALL_TYPES_FILE_NAME) + + "' into table " + ALL_PRIMITIVE_TYPES_TABLE; + executeStatementOnDriver(cmd, driver); + } + } } diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java index b9568f8..2c5a778 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java @@ -134,7 +134,7 @@ private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expec server.registerQuery("data = load '" + data + "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);"); server.registerQuery( - "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer();"); + "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer('','','-onOutOfRangeValue Throw');"); List jobs = server.executeBatch(); Assert.assertEquals(expectedStatus, jobs.get(0).getStatus()); } diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java index 2674708..8bf9565 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java @@ -18,13 +18,19 @@ */ package org.apache.hive.hcatalog.pig; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.Properties; import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.mapreduce.HCatBaseTest; import org.apache.pig.EvalFunc; @@ -35,13 +41,333 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.LogUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestHCatStorer extends HCatBaseTest { + private static final Logger LOG = LoggerFactory.getLogger(TestHCatStorer.class); private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data"; + //Start: tests that check values from Pig that are out of range for target column + @Test + public void testWriteTinyint() throws Exception { + pigValueRangeTest("junitTypeTest1", "tinyint", "int", null, Integer.toString(1), Integer.toString(1)); + pigValueRangeTestOverflow("junitTypeTest1", "tinyint", "int", null, Integer.toString(300)); + pigValueRangeTestOverflow("junitTypeTest2", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + Integer.toString(300)); + pigValueRangeTestOverflow("junitTypeTest3", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + Integer.toString(300)); + } + @Test + public void testWriteSmallint() throws Exception { + pigValueRangeTest("junitTypeTest1", "smallint", "int", null, Integer.toString(Short.MIN_VALUE), + Integer.toString(Short.MIN_VALUE)); + pigValueRangeTestOverflow("junitTypeTest2", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + Integer.toString(Short.MAX_VALUE + 1)); + pigValueRangeTestOverflow("junitTypeTest3", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + Integer.toString(Short.MAX_VALUE + 1)); + } + @Test + public void testWriteChar() throws Exception { + pigValueRangeTest("junitTypeTest1", "char(5)", "chararray", null, "xxx", "xxx "); + pigValueRangeTestOverflow("junitTypeTest1", "char(5)", "chararray", null, "too_long"); + pigValueRangeTestOverflow("junitTypeTest2", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + "too_long"); + pigValueRangeTestOverflow("junitTypeTest3", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + "too_long2"); + } + @Test + public void testWriteVarchar() throws Exception { + pigValueRangeTest("junitTypeTest1", "varchar(5)", "chararray", null, "xxx", "xxx"); + pigValueRangeTestOverflow("junitTypeTest1", "varchar(5)", "chararray", null, "too_long"); + pigValueRangeTestOverflow("junitTypeTest2", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + "too_long"); + pigValueRangeTestOverflow("junitTypeTest3", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + "too_long2"); + } + @Test + public void testWriteDecimalXY() throws Exception { + pigValueRangeTest("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(1.2).toString(), + BigDecimal.valueOf(1.2).toString()); + pigValueRangeTestOverflow("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(12345.12).toString()); + pigValueRangeTestOverflow("junitTypeTest2", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + BigDecimal.valueOf(500.123).toString()); + pigValueRangeTestOverflow("junitTypeTest3", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + BigDecimal.valueOf(500.123).toString()); + } + @Test + public void testWriteDecimalX() throws Exception { + //interestingly decimal(2) means decimal(2,0) + pigValueRangeTest("junitTypeTest1", "decimal(2)", "bigdecimal", null, BigDecimal.valueOf(12).toString(), + BigDecimal.valueOf(12).toString()); + pigValueRangeTestOverflow("junitTypeTest2", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + BigDecimal.valueOf(50.123).toString()); + pigValueRangeTestOverflow("junitTypeTest3", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + BigDecimal.valueOf(50.123).toString()); + } + @Test + public void testWriteDecimal() throws Exception { + //decimal means decimal(10,0) + pigValueRangeTest("junitTypeTest1", "decimal", "bigdecimal", null, BigDecimal.valueOf(1234567890).toString(), + BigDecimal.valueOf(1234567890).toString()); + pigValueRangeTestOverflow("junitTypeTest2", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + BigDecimal.valueOf(12345678900L).toString()); + pigValueRangeTestOverflow("junitTypeTest3", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + BigDecimal.valueOf(12345678900L).toString()); + } + /** + * because we want to ignore TZ which is included in toString() + * include time to make sure it's 0 + */ + private static final String FORMAT_4_DATE = "yyyy-MM-dd HH:mm:ss"; + @Test + public void testWriteDate() throws Exception { + DateTime d = new DateTime(1991,10,11,0,0); + pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(), + d.toString(FORMAT_4_DATE), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.plusHours(2).toString(), FORMAT_4_DATE);//time != 0 + pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.plusMinutes(1).toString(), FORMAT_4_DATE);//time != 0 + d = new DateTime(1991,10,11,0,0,DateTimeZone.forOffsetHours(-11)); + pigValueRangeTest("junitTypeTest4", "date", "datetime", null, d.toString(), + d.toString(FORMAT_4_DATE), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.plusHours(2).toString(), FORMAT_4_DATE);//date out of range due to time != 0 + pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.plusMinutes(1).toString(), FORMAT_4_DATE);//date out of range due to time!=0 + } + @Test + public void testWriteDate3() throws Exception { + DateTime d = new DateTime(1991,10,11,23,10,DateTimeZone.forOffsetHours(-11)); + FrontendException fe = null; + //expect to fail since the time component is not 0 + pigValueRangeTestOverflow("junitTypeTest4", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.toString(), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.plusHours(2).toString(), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.plusMinutes(1).toString(), FORMAT_4_DATE); + } + @Test + public void testWriteDate2() throws Exception { + DateTime d = new DateTime(1991,11,12,0,0, DateTimeZone.forID("US/Eastern")); + pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(), + d.toString(FORMAT_4_DATE), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.plusHours(2).toString(), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.plusMillis(20).toString(), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.plusMillis(12).toString(), FORMAT_4_DATE); + pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw, + d.plusMinutes(1).toString(), FORMAT_4_DATE); + } + /** + * Note that the value that comes back from Hive will have local TZ on it. Using local is + * arbitrary but DateTime needs TZ (or will assume default) and Hive does not have TZ. + * So if you start with Pig value in TZ=x and write to Hive, when you read it back the TZ may + * be different. The millis value should match, of course. + * + * @throws Exception + */ + @Test + public void testWriteTimestamp() throws Exception { + DateTime d = new DateTime(1991,10,11,14,23,30, 10);//uses default TZ + pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(), + d.toDateTime(DateTimeZone.getDefault()).toString()); + d = d.plusHours(2); + pigValueRangeTest("junitTypeTest2", "timestamp", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null, + d.toString(), d.toDateTime(DateTimeZone.getDefault()).toString()); + d = d.toDateTime(DateTimeZone.UTC); + pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(), + d.toDateTime(DateTimeZone.getDefault()).toString()); + + d = new DateTime(1991,10,11,23,24,25, 26); + pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(), + d.toDateTime(DateTimeZone.getDefault()).toString()); + d = d.toDateTime(DateTimeZone.UTC); + pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(), + d.toDateTime(DateTimeZone.getDefault()).toString()); + } + //End: tests that check values from Pig that are out of range for target column + + + private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType, + HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String format) throws Exception { + pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, format); + } + private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType, + HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue) throws Exception { + pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, null); + } + private void pigValueRangeTest(String tblName, String hiveType, String pigType, + HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, + String expectedValue) throws Exception { + pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, expectedValue, null); + } + /** + * This is used to test how Pig values of various data types which are out of range for Hive target + * column are handled. Currently the options are to raise an error or write NULL. + * 1. create a data file with 1 column, 1 row + * 2. load into pig + * 3. use pig to store into Hive table + * 4. read from Hive table using Pig + * 5. check that read value is what is expected + * @param tblName Hive table name to create + * @param hiveType datatype to use for the single column in table + * @param pigType corresponding Pig type when loading file into Pig + * @param goal how out-of-range values from Pig are handled by HCat, may be {@code null} + * @param inputValue written to file which is read by Pig, thus must be something Pig can read + * (e.g. DateTime.toString(), rather than java.sql.Date) + * @param expectedValue what Pig should see when reading Hive table + * @param format date format to use for comparison of values since default DateTime.toString() + * includes TZ which is meaningless for Hive DATE type + */ + private void pigValueRangeTest(String tblName, String hiveType, String pigType, + HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String expectedValue, String format) + throws Exception { + TestHCatLoader.dropTable(tblName, driver); + final String field = "f1"; + TestHCatLoader.createTable(tblName, field + " " + hiveType, null, driver, "RCFILE"); + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, new String[] {inputValue}); + LOG.debug("File=" + INPUT_FILE_NAME); + dumpFile(INPUT_FILE_NAME); + PigServer server = createPigServer(true); + int queryNumber = 1; + logAndRegister(server, + "A = load '" + INPUT_FILE_NAME + "' as (" + field + ":" + pigType + ");", queryNumber++); + Iterator firstLoad = server.openIterator("A"); + if(goal == null) { + logAndRegister(server, + "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++); + } + else { + FrontendException fe = null; + try { + logAndRegister(server, + "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "('','','-" + + HCatStorer.ON_OOR_VALUE_OPT + " " + goal + "');", + queryNumber++); + } + catch(FrontendException e) { + fe = e; + } + switch (goal) { + case Null: + //do nothing, fall through and verify the data + break; + case Throw: + Assert.assertTrue("Expected a FrontendException", fe != null); + Assert.assertEquals("Expected a different FrontendException.", fe.getMessage(), "Unable to store alias A"); + return;//this test is done + default: + Assert.assertFalse("Unexpected goal: " + goal, 1 == 1); + } + } + logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();", queryNumber); + CommandProcessorResponse cpr = driver.run("select * from " + tblName); + LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage() + + " for table " + tblName); + List l = new ArrayList(); + driver.getResults(l); + LOG.debug("Dumping rows via SQL from " + tblName); + for(Object t : l) { + LOG.debug(t == null ? null : t.toString() + " t.class=" + t.getClass()); + } + Iterator itr = server.openIterator("B"); + int numRowsRead = 0; + while(itr.hasNext()) { + Tuple t = itr.next(); + if("date".equals(hiveType)) { + DateTime dateTime = (DateTime)t.get(0); + Assert.assertTrue(format != null); + Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, dateTime== null ? null : dateTime.toString(format)); + } + else { + Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, t.isNull(0) ? null : t.get(0).toString()); + } + //see comment at "Dumping rows via SQL..." for why this doesn't work + //Assert.assertEquals("Comparing Pig to Hive", t.get(0), l.get(0)); + numRowsRead++; + } + Assert.assertEquals("Expected " + 1 + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME + "; table " + + tblName, 1, numRowsRead); + /* Misc notes: + Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String + thus the timestamp in 't' doesn't match rawData*/ + } + /** + * Create a data file with datatypes added in 0.13. Read it with Pig and use + * Pig + HCatStorer to write to a Hive table. Then read it using Pig and Hive + * and make sure results match. + */ + @Test + public void testDateCharTypes() throws Exception { + final String tblName = "junit_date_char"; + TestHCatLoader.dropTable(tblName, driver); + TestHCatLoader.createTable(tblName, + "id int, char5 char(5), varchar10 varchar(10), dec52 decimal(5,2)", null, driver, "RCFILE"); + int NUM_ROWS = 5; + String[] rows = new String[NUM_ROWS]; + for(int i = 0; i < NUM_ROWS; i++) { + //since the file is read by Pig, we need to make sure the values are in format that Pig understands + //otherwise it will turn the value to NULL on read + rows[i] = i + "\txxxxx\tyyy\t" + 5.2; + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, rows); + LOG.debug("File=" + INPUT_FILE_NAME); +// dumpFile(INPUT_FILE_NAME); + PigServer server = createPigServer(true); + int queryNumber = 1; + logAndRegister(server, + "A = load '" + INPUT_FILE_NAME + "' as (id:int, char5:chararray, varchar10:chararray, dec52:bigdecimal);", + queryNumber++); + logAndRegister(server, + "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++); + logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();", + queryNumber); + CommandProcessorResponse cpr = driver.run("select * from " + tblName); + LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage()); + List l = new ArrayList(); + driver.getResults(l); + LOG.debug("Dumping rows via SQL from " + tblName); + /*Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String + * thus the timestamp in 't' doesn't match rawData*/ + for(Object t : l) { + LOG.debug(t == null ? null : t.toString()); + } + Iterator itr = server.openIterator("B"); + int numRowsRead = 0; + while (itr.hasNext()) { + Tuple t = itr.next(); + StringBuilder rowFromPig = new StringBuilder(); + for(int i = 0; i < t.size(); i++) { + rowFromPig.append(t.get(i)).append("\t"); + } + rowFromPig.setLength(rowFromPig.length() - 1); + Assert.assertEquals("Comparing Pig to Raw data", rowFromPig.toString(), rows[numRowsRead]); + //see comment at "Dumping rows via SQL..." for why this doesn't work (for all types) + //Assert.assertEquals("Comparing Pig to Hive", rowFromPig.toString(), l.get(numRowsRead)); + numRowsRead++; + } + Assert.assertEquals("Expected " + NUM_ROWS + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME, NUM_ROWS, numRowsRead); + } + private static void dumpFile(String fileName) throws Exception { + File file = new File(fileName); + BufferedReader reader = new BufferedReader(new FileReader(file)); + String line = null; + LOG.debug("Dumping raw file: " + fileName); + while((line = reader.readLine()) != null) { + LOG.debug(line); + } + reader.close(); + } @Test public void testPartColsInData() throws IOException, CommandNeedRetryException { @@ -74,7 +400,7 @@ public void testPartColsInData() throws IOException, CommandNeedRetryException { } Assert.assertFalse(itr.hasNext()); - Assert.assertEquals(11, i); + Assert.assertEquals(LOOP_SIZE, i); } @Test @@ -597,7 +923,7 @@ public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOEx Assert.assertEquals(0, results.size()); driver.run("drop table employee"); } - + @Test public void testPartitionPublish() throws IOException, CommandNeedRetryException { @@ -619,9 +945,9 @@ public void testPartitionPublish() server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, c:chararray);"); server.registerQuery("B = filter A by " + FailEvalFunc.class.getName() - + "($0);"); + + "($0);"); server.registerQuery("store B into 'ptn_fail' using " - + HCatStorer.class.getName() + "('b=math');"); + + HCatStorer.class.getName() + "('b=math');"); server.executeBatch(); String query = "show partitions ptn_fail"; @@ -639,7 +965,7 @@ public void testPartitionPublish() // Make sure the partitions directory is not in hdfs. Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists()); Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math")) - .exists()); + .exists()); } static public class FailEvalFunc extends EvalFunc { diff --git itests/hcatalog-unit/pom.xml itests/hcatalog-unit/pom.xml index 9f6156d..9d83768 100644 --- itests/hcatalog-unit/pom.xml +++ itests/hcatalog-unit/pom.xml @@ -192,6 +192,14 @@ ${pig.version} test + + + joda-time + joda-time + 2.2 + test + @@ -332,6 +340,14 @@ h2 test + + + joda-time + joda-time + 2.2 + test + diff --git pom.xml pom.xml index 41f5337..f3e5f57 100644 --- pom.xml +++ pom.xml @@ -127,7 +127,7 @@ requires netty < 3.6.0 we force hadoops version --> 3.4.0.Final - 0.10.1 + 0.12.0 2.5.0 1.0.1 1.7.5