diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index de47412..9152d94 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -62,9 +62,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.LlapRowRecordReader; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.TypeDesc; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -160,6 +162,23 @@ private void createTestTable(String tableName) throws Exception { stmt.close(); } + private void createTableWithComplexTypes(String tableName) throws Exception { + Statement stmt = hs2Conn.createStatement(); + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (c1 array, c2 map, c3 struct>, c4 array>>)"); + + // load data + stmt.execute("insert into " + tableName + + " select array(1, 2, 3)" + + ", map(1, 'one', 2, 'two')" + + ", named_struct('f1', 1, 'f2', 'two', 'f3', array(1,2,3))" + + ", array(named_struct('f1', 11, 'f2', 'two', 'f3', array(2,3,4)))"); + stmt.close(); + } + @Test(timeout = 60000) public void testLlapInputFormatEndToEnd() throws Exception { createTestTable("testtab1"); @@ -212,6 +231,91 @@ public void testEscapedStrings() throws Exception { assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2)); } + @Test(timeout = 60000) + public void testComplexTypes() throws Exception { + createTableWithComplexTypes("complex1"); + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from complex1"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(1, rowCount); + Object[] rowValues = rowCollector.rows.get(0); + + // assertEquals("[1, 2, 3]", rowValues[0]); + FieldDesc c1Desc = rowCollector.schema.getColumns().get(0); + List c1Value = (List) rowValues[0]; + assertEquals("complex1.c1", c1Desc.getName()); + assertEquals(TypeDesc.Type.LIST, c1Desc.getTypeDesc().getType()); + assertEquals(TypeDesc.Type.INT, c1Desc.getTypeDesc().getListElementTypeDesc().getType()); + assertEquals(3, c1Value.size()); + assertEquals(Integer.valueOf(1), c1Value.get(0)); + assertEquals(Integer.valueOf(2), c1Value.get(1)); + assertEquals(Integer.valueOf(3), c1Value.get(2)); + + // assertEquals("{1=one, 2=two}", rowValues[1]); + FieldDesc c2Desc = rowCollector.schema.getColumns().get(1); + Map c2Value = (Map) rowValues[1]; + assertEquals("complex1.c2", c2Desc.getName()); + assertEquals(TypeDesc.Type.MAP, c2Desc.getTypeDesc().getType()); + assertEquals(TypeDesc.Type.INT, c2Desc.getTypeDesc().getMapKeyTypeDesc().getType()); + assertEquals(TypeDesc.Type.STRING, c2Desc.getTypeDesc().getMapValueTypeDesc().getType()); + assertEquals(2, c2Value.size()); + assertEquals("one", c2Value.get(Integer.valueOf(1))); + assertEquals("two", c2Value.get(Integer.valueOf(2))); + + // assertEquals("[1, two, [1, 2, 3]]", rowValues[2]); + FieldDesc c3Desc = rowCollector.schema.getColumns().get(2); + Row c3Value = (Row) rowValues[2]; + assertEquals("complex1.c3", c3Desc.getName()); + assertEquals(TypeDesc.Type.STRUCT, c3Desc.getTypeDesc().getType()); + verifyStructFieldSchema(c3Desc.getTypeDesc().getStructSchema()); + + assertEquals(Integer.valueOf(1), c3Value.getInt("f1")); + assertEquals(Integer.valueOf(1), c3Value.getInt(0)); + assertEquals("two", c3Value.getString("f2")); + assertEquals("two", c3Value.getString(1)); + List f3Value = c3Value.getList("f3"); + assertEquals(3, f3Value.size()); + assertEquals(Integer.valueOf(1), f3Value.get(0)); + assertEquals(Integer.valueOf(2), f3Value.get(1)); + assertEquals(Integer.valueOf(3), f3Value.get(2)); + + // assertEquals("[[11, two, [2, 3, 4]]]", rowValues[3]); + FieldDesc c4Desc = rowCollector.schema.getColumns().get(3); + List c4Value = (List) rowValues[3]; + assertEquals("complex1.c4", c4Desc.getName()); + assertEquals(TypeDesc.Type.LIST, c4Desc.getTypeDesc().getType()); + assertEquals(TypeDesc.Type.STRUCT, c4Desc.getTypeDesc().getListElementTypeDesc().getType()); + verifyStructFieldSchema(c4Desc.getTypeDesc().getListElementTypeDesc().getStructSchema()); + + assertEquals(1, c4Value.size()); + Row c4Element = ((Row) c4Value.get(0)); + assertEquals(Integer.valueOf(11), c4Element.getInt("f1")); + assertEquals(Integer.valueOf(11), c4Element.getInt(0)); + assertEquals("two", c4Element.getString("f2")); + assertEquals("two", c4Element.getString(1)); + f3Value = c4Element.getList("f3"); + assertEquals(3, f3Value.size()); + assertEquals(Integer.valueOf(2), f3Value.get(0)); + assertEquals(Integer.valueOf(3), f3Value.get(1)); + assertEquals(Integer.valueOf(4), f3Value.get(2)); + } + + private void verifyStructFieldSchema(Schema structSchema) { + FieldDesc f1Desc = structSchema.getColumns().get(0); + FieldDesc f2Desc = structSchema.getColumns().get(1); + FieldDesc f3Desc = structSchema.getColumns().get(2); + + assertEquals("f1", f1Desc.getName()); + assertEquals(TypeDesc.Type.INT, f1Desc.getTypeDesc().getType()); + + assertEquals("f2", f2Desc.getName()); + assertEquals(TypeDesc.Type.STRING, f2Desc.getTypeDesc().getType()); + + assertEquals("f3", f3Desc.getName()); + assertEquals(TypeDesc.Type.LIST, f3Desc.getTypeDesc().getType()); + assertEquals(TypeDesc.Type.INT, f3Desc.getTypeDesc().getListElementTypeDesc().getType()); + } + private interface RowProcessor { void process(Row row); } @@ -235,6 +339,26 @@ public void process(Row row) { } } + // Save the actual values from each row as opposed to the String representation. + private static class RowCollector2 implements RowProcessor { + ArrayList rows = new ArrayList(); + Schema schema = null; + int numColumns = 0; + + public void process(Row row) { + if (schema == null) { + schema = row.getSchema(); + numColumns = schema.getColumns().size(); + } + + Object[] arr = new Object[numColumns]; + for (int idx = 0; idx < numColumns; ++idx) { + arr[idx] = row.getValue(idx); + } + rows.add(arr); + } + } + private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception { String url = miniHS2.getJdbcURL(); String user = System.getProperty("user.name"); diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java index ee92f3e..a337cd5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -21,7 +21,11 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -32,6 +36,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.Schema; @@ -42,6 +49,9 @@ import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -111,27 +121,7 @@ public boolean next(NullWritable key, Row value) throws IOException { try { StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); rowObj = serde.deserialize(textData); - List colFields = rowOI.getAllStructFieldRefs(); - for (int idx = 0; idx < colFields.size(); ++idx) { - StructField field = colFields.get(idx); - Object colValue = rowOI.getStructFieldData(rowObj, field); - Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE, - "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName()); - - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector(); - // char/varchar special cased here since the row record handles them using Text - switch (poi.getPrimitiveCategory()) { - case CHAR: - value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue()); - break; - case VARCHAR: - value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue()); - break; - default: - value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue)); - break; - } - } + setRowFromStruct(value, rowObj, rowOI); } catch (SerDeException err) { if (LOG.isDebugEnabled()) { LOG.debug("Error deserializing row from text: " + textData); @@ -147,6 +137,87 @@ public Schema getSchema() { return schema; } + static Object convertPrimitive(Object val, PrimitiveObjectInspector poi) { + switch (poi.getPrimitiveCategory()) { + // Save char/varchar as string + case CHAR: + return ((HiveChar) poi.getPrimitiveJavaObject(val)).getPaddedValue(); + case VARCHAR: + return ((HiveVarchar) poi.getPrimitiveJavaObject(val)).toString(); + case DECIMAL: + return ((HiveDecimal) poi.getPrimitiveJavaObject(val)).bigDecimalValue(); + default: + return poi.getPrimitiveJavaObject(val); + } + } + + static Object convertValue(Object val, ObjectInspector oi, TypeDesc typeDesc) { + if (val == null) { + return null; + } + + Object convertedVal = null; + ObjectInspector.Category oiCategory = oi.getCategory(); + switch (oiCategory) { + case PRIMITIVE: + convertedVal = convertPrimitive(val, (PrimitiveObjectInspector) oi); + break; + case LIST: + ListObjectInspector loi = (ListObjectInspector) oi; + int listSize = loi.getListLength(val); + List convertedList = new ArrayList(listSize); + ObjectInspector listElementOI = loi.getListElementObjectInspector(); + for (int idx = 0; idx < listSize; ++idx) { + convertedList.add( + convertValue(loi.getListElement(val, idx), + listElementOI, + typeDesc.getListElementTypeDesc())); + } + convertedVal = convertedList; + break; + case MAP: + MapObjectInspector moi = (MapObjectInspector) oi; + int mapSize = moi.getMapSize(val); + Map convertedMap = new LinkedHashMap(mapSize); + ObjectInspector mapKeyOI = moi.getMapKeyObjectInspector(); + ObjectInspector mapValOI = moi.getMapValueObjectInspector(); + Map mapCol = moi.getMap(val); + for (Object mapKey : mapCol.keySet()) { + Object convertedMapKey = convertValue(mapKey, mapKeyOI, typeDesc.getMapKeyTypeDesc()); + Object convertedMapVal = convertValue(mapCol.get(mapKey), mapValOI, typeDesc.getMapValueTypeDesc()); + convertedMap.put(convertedMapKey, convertedMapVal); + } + convertedVal = convertedMap; + break; + case STRUCT: + StructObjectInspector soi = (StructObjectInspector) oi; + Schema structSchema = typeDesc.getStructSchema(); + Row convertedRow = new Row(structSchema); + setRowFromStruct(convertedRow, val, soi); + convertedVal = convertedRow; + break; + default: + throw new IllegalArgumentException("Cannot convert type " + oiCategory); + } + + return convertedVal; + } + + static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) { + Schema structSchema = row.getSchema(); + // Add struct field data to the Row + List fieldDescs = structSchema.getColumns(); + for (int idx = 0; idx < fieldDescs.size(); ++idx) { + FieldDesc fieldDesc = fieldDescs.get(idx); + StructField structField = soi.getStructFieldRef(fieldDesc.getName()); + Object convertedStructValue = convertValue( + soi.getStructFieldData(structVal, structField), + structField.getFieldObjectInspector(), + fieldDesc.getTypeDesc()); + row.setValue(idx, convertedStructValue); + } + } + protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException { Properties props = new Properties(); StringBuffer columnsBuffer = new StringBuffer(); diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java index a84fadc..c98fa12 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java @@ -17,150 +17,182 @@ */ package org.apache.hadoop.hive.llap; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; import java.util.HashMap; import java.util.List; import java.util.Map; import com.google.common.base.Preconditions; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; -import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; - -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - - public class Row { private final Schema schema; - private final Writable[] colValues; - private final boolean[] nullIndicators; + private final Object[] colValues; private Map nameToIndexMapping; public Row(Schema schema) { this.schema = schema; - this.colValues = new Writable[schema.getColumns().size()]; - this.nullIndicators = new boolean[schema.getColumns().size()]; + this.colValues = new Object[schema.getColumns().size()]; this.nameToIndexMapping = new HashMap(schema.getColumns().size()); List colDescs = schema.getColumns(); for (int idx = 0; idx < colDescs.size(); ++idx) { FieldDesc colDesc = colDescs.get(idx); nameToIndexMapping.put(colDesc.getName(), idx); - colValues[idx] = createWritableForType(colDesc.getTypeDesc()); } } - public Writable getValue(int colIndex) { - if (nullIndicators[colIndex]) { - return null; - } + public Object getValue(int colIndex) { return colValues[colIndex]; } - public Writable getValue(String colName) { + public Object getValue(String colName) { Integer idx = nameToIndexMapping.get(colName); Preconditions.checkArgument(idx != null); return getValue(idx); } + public Boolean getBoolean(int idx) { + return (Boolean) getValue(idx); + } + + public Boolean getBoolean(String colName) { + return (Boolean) getValue(colName); + } + + public Byte getByte(int idx) { + return (Byte) getValue(idx); + } + + public Byte getByte(String colName) { + return (Byte) getValue(colName); + } + + public Short getShort(int idx) { + return (Short) getValue(idx); + } + + public Short getShort(String colName) { + return (Short) getValue(colName); + } + + public Integer getInt(int idx) { + return (Integer) getValue(idx); + } + + public Integer getInt(String colName) { + return (Integer) getValue(colName); + } + + public Long getLong(int idx) { + return (Long) getValue(idx); + } + + public Long getLong(String colName) { + return (Long) getValue(colName); + } + + public Float getFloat(int idx) { + return (Float) getValue(idx); + } + + public Float getFloat(String colName) { + return (Float) getValue(colName); + } + + public Double getDouble(int idx) { + return (Double) getValue(idx); + } + + public Double getDouble(String colName) { + return (Double) getValue(colName); + } + + public String getString(int idx) { + return (String) getValue(idx); + } + + public String getString(String colName) { + return (String) getValue(colName); + } + + public Date getDate(int idx) { + return (Date) getValue(idx); + } + + public Date getDate(String colName) { + return (Date) getValue(colName); + } + + public Timestamp getTimestamp(int idx) { + return (Timestamp) getValue(idx); + } + + public Timestamp getTimestamp(String colName) { + return (Timestamp) getValue(colName); + } + + public byte[] getBytes(int idx) { + return (byte[]) getValue(idx); + } + + public byte[] getBytes(String colName) { + return (byte[]) getValue(colName); + } + + public BigDecimal getDecimal(int idx) { + return (BigDecimal) getValue(idx); + } + + public BigDecimal getDecimal(String colName) { + return (BigDecimal) getValue(colName); + } + + public List getList(int idx) { + return (List) getValue(idx); + } + + public List getList(String colName) { + return (List) getValue(colName); + } + + public Map getMap(int idx) { + return (Map) getValue(idx); + } + + public Map getMap(String colName) { + return (Map) getValue(colName); + } + + public Row getStruct(int idx) { + return (Row) getValue(idx); + } + + public Row getStruct(String colName) { + return (Row) getValue(colName); + } + public Schema getSchema() { return schema; } - void setValue(int colIdx, Writable value) { - Preconditions.checkArgument(colIdx <= schema.getColumns().size()); - - if (value == null) { - nullIndicators[colIdx] = true; - } else { - nullIndicators[colIdx] = false; - FieldDesc colDesc = schema.getColumns().get(colIdx); - switch (colDesc.getTypeDesc().getType()) { - case BOOLEAN: - ((BooleanWritable) colValues[colIdx]).set(((BooleanWritable) value).get()); - break; - case TINYINT: - ((ByteWritable) colValues[colIdx]).set(((ByteWritable) value).get()); - break; - case SMALLINT: - ((ShortWritable) colValues[colIdx]).set(((ShortWritable) value).get()); - break; - case INT: - ((IntWritable) colValues[colIdx]).set(((IntWritable) value).get()); - break; - case BIGINT: - ((LongWritable) colValues[colIdx]).set(((LongWritable) value).get()); - break; - case FLOAT: - ((FloatWritable) colValues[colIdx]).set(((FloatWritable) value).get()); - break; - case DOUBLE: - ((DoubleWritable) colValues[colIdx]).set(((DoubleWritable) value).get()); - break; - case STRING: - // Just handle char/varchar as Text - case CHAR: - case VARCHAR: - ((Text) colValues[colIdx]).set((Text) value); - break; - case DATE: - ((DateWritable) colValues[colIdx]).set((DateWritable) value); - break; - case TIMESTAMP: - ((TimestampWritable) colValues[colIdx]).set((TimestampWritable) value); - break; - case BINARY: - ((BytesWritable) colValues[colIdx]).set(((BytesWritable) value)); - break; - case DECIMAL: - ((HiveDecimalWritable) colValues[colIdx]).set((HiveDecimalWritable) value); - break; - } - } + void setValue(int colIdx, Object obj) { + colValues[colIdx] = obj; } - private Writable createWritableForType(TypeDesc typeDesc) { - switch (typeDesc.getType()) { - case BOOLEAN: - return new BooleanWritable(); - case TINYINT: - return new ByteWritable(); - case SMALLINT: - return new ShortWritable(); - case INT: - return new IntWritable(); - case BIGINT: - return new LongWritable(); - case FLOAT: - return new FloatWritable(); - case DOUBLE: - return new DoubleWritable(); - case STRING: - // Just handle char/varchar as Text - case CHAR: - case VARCHAR: - return new Text(); - case DATE: - return new DateWritable(); - case TIMESTAMP: - return new TimestampWritable(); - case BINARY: - return new BytesWritable(); - case DECIMAL: - return new HiveDecimalWritable(); - default: - throw new RuntimeException("Cannot create writable for " + typeDesc.getType()); + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int idx = 0; idx < schema.getColumns().size(); ++idx) { + if (idx > 0) { + sb.append(", "); + } + Object val = getValue(idx); + sb.append(val == null ? "null" : val.toString()); } + sb.append("]"); + return sb.toString(); } } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java index dda5928..82ac6e7 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java @@ -21,6 +21,18 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.Writable; public class TypeDesc implements Writable { @@ -39,12 +51,20 @@ TIMESTAMP, BINARY, DECIMAL, + LIST, + MAP, + STRUCT } private TypeDesc.Type type; private int precision; private int scale; + private TypeDesc listElementTypeDesc; + private TypeDesc mapKeyTypeDesc; + private TypeDesc mapValueTypeDesc; + private Schema structSchema; + // For types with no type qualifiers public TypeDesc(TypeDesc.Type type) { this(type, 0, 0); @@ -62,6 +82,25 @@ public TypeDesc(TypeDesc.Type type, int precision) { this(type, precision, 0); } + // For list types + public TypeDesc(TypeDesc listElementTypeDesc) { + this(TypeDesc.Type.LIST); + this.listElementTypeDesc = listElementTypeDesc; + } + + // For map types + public TypeDesc(TypeDesc mapKeyTypeDesc, TypeDesc mapValueTypeDesc) { + this(TypeDesc.Type.MAP); + this.mapKeyTypeDesc = mapKeyTypeDesc; + this.mapValueTypeDesc = mapValueTypeDesc; + } + + // For struct types + public TypeDesc(Schema structSchema) { + this(TypeDesc.Type.STRUCT); + this.structSchema = structSchema; + } + // Should be used for serialization only public TypeDesc() { this(TypeDesc.Type.INT, 0, 0); @@ -79,6 +118,31 @@ public int getScale() { return scale; } + public TypeDesc getListElementTypeDesc() { + if (getType() != Type.LIST) { + throw new IllegalStateException(getType() + " is not a list type"); + } + return listElementTypeDesc; + } + + public TypeDesc getMapKeyTypeDesc() { + if (getType() != Type.MAP) { + throw new IllegalStateException(getType() + " is not a map type"); + } + return mapKeyTypeDesc; + } + + public TypeDesc getMapValueTypeDesc() { + if (getType() != Type.MAP) { + throw new IllegalStateException(getType() + " is not a map type"); + } + return mapValueTypeDesc; + } + + public Schema getStructSchema() { + return structSchema; + } + @Override public String toString() { switch (type) { @@ -87,6 +151,25 @@ public String toString() { case CHAR: case VARCHAR: return type.name().toLowerCase() + "(" + precision + ")"; + case LIST: + return "array" + "<" + listElementTypeDesc.toString() + ">"; + case MAP: + return "map" + "<" + mapKeyTypeDesc.toString() + "," + mapValueTypeDesc.toString() + ">"; + case STRUCT: + StringBuilder sb = new StringBuilder(); + sb.append("struct" + "<"); + boolean firstColumn = true; + for (FieldDesc structField : structSchema.getColumns()) { + if (!firstColumn) { + sb.append(","); + } + firstColumn = false; + sb.append(structField.getName()); + sb.append(":"); + sb.append(structField.getTypeDesc()); + } + sb.append(">"); + return sb.toString(); default: return type.name().toLowerCase(); } @@ -95,14 +178,147 @@ public String toString() { @Override public void write(DataOutput out) throws IOException { out.writeUTF(type.name()); - out.writeInt(precision); - out.writeInt(scale); + + switch (type) { + case LIST: + listElementTypeDesc.write(out); + break; + case MAP: + mapKeyTypeDesc.write(out); + mapValueTypeDesc.write(out); + break; + case STRUCT: + structSchema.write(out); + break; + default: + // Primitive type + out.writeInt(precision); + out.writeInt(scale); + break; + } } @Override public void readFields(DataInput in) throws IOException { + precision = 0; + scale = 0; + type = TypeDesc.Type.valueOf(in.readUTF()); - precision = in.readInt(); - scale = in.readInt(); + switch (type) { + case LIST: + listElementTypeDesc = new TypeDesc(); + listElementTypeDesc.readFields(in); + break; + case MAP: + mapKeyTypeDesc = new TypeDesc(); + mapKeyTypeDesc.readFields(in); + mapValueTypeDesc = new TypeDesc(); + mapValueTypeDesc.readFields(in); + break; + case STRUCT: + structSchema = new Schema(); + structSchema.readFields(in); + break; + default: + // Primitive type + precision = in.readInt(); + scale = in.readInt(); + break; + } + } + + private static TypeDesc convertPrimitiveType(PrimitiveTypeInfo primitiveTypeInfo) { + TypeDesc typeDesc; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + typeDesc = new TypeDesc(Type.BOOLEAN); + break; + case BYTE: + typeDesc = new TypeDesc(Type.TINYINT); + break; + case SHORT: + typeDesc = new TypeDesc(Type.SMALLINT); + break; + case INT: + typeDesc = new TypeDesc(Type.INT); + break; + case LONG: + typeDesc = new TypeDesc(Type.BIGINT); + break; + case FLOAT: + typeDesc = new TypeDesc(Type.FLOAT); + break; + case DOUBLE: + typeDesc = new TypeDesc(Type.DOUBLE); + break; + case STRING: + typeDesc = new TypeDesc(Type.STRING); + break; + case CHAR: + CharTypeInfo charTypeInfo = (CharTypeInfo) primitiveTypeInfo; + typeDesc = new TypeDesc(Type.CHAR, charTypeInfo.getLength()); + break; + case VARCHAR: + VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) primitiveTypeInfo; + typeDesc = new TypeDesc(Type.VARCHAR, + varcharTypeInfo.getLength()); + break; + case DATE: + typeDesc = new TypeDesc(Type.DATE); + break; + case TIMESTAMP: + typeDesc = new TypeDesc(Type.TIMESTAMP); + break; + case BINARY: + typeDesc = new TypeDesc(Type.BINARY); + break; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo; + typeDesc = new TypeDesc(Type.DECIMAL, + decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + break; + default: + throw new IllegalArgumentException("Unsupported type " + primitiveTypeInfo.getTypeName()); + } + + return typeDesc; + } + + private static TypeDesc convertType(TypeInfo typeInfo) { + TypeDesc typeDesc; + + switch (typeInfo.getCategory()) { + case PRIMITIVE: + typeDesc = convertPrimitiveType((PrimitiveTypeInfo) typeInfo); + break; + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + typeDesc = new TypeDesc(convertType(listTypeInfo.getListElementTypeInfo())); + break; + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + typeDesc = new TypeDesc( + convertType(mapTypeInfo.getMapKeyTypeInfo()), + convertType(mapTypeInfo.getMapValueTypeInfo())); + break; + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + ArrayList fieldDescs = new ArrayList(); + List fieldNames = structTypeInfo.getAllStructFieldNames(); + List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + for (int idx = 0; idx < fieldNames.size(); ++idx) { + fieldDescs.add(new FieldDesc(fieldNames.get(idx), convertType(fieldTypeInfos.get(idx)))); + } + typeDesc = new TypeDesc(new Schema(fieldDescs)); + break; + default: + throw new IllegalArgumentException("Unsupported type " + typeInfo.getTypeName()); + } + + return typeDesc; + } + + public static TypeDesc fromTypeString(String typeString) { + return convertType(TypeInfoUtils.getTypeInfoFromTypeString(typeString)); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 9ddbd7e..b6209a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -82,12 +82,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; @@ -526,75 +520,13 @@ private String getSha(Path localFile, Configuration conf) throws IOException, } } - private TypeDesc convertTypeString(String typeString) throws HiveException { - TypeDesc typeDesc; - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString); - Preconditions.checkState( - typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE, - "Unsupported non-primitive type " + typeString); - - switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { - case BOOLEAN: - typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN); - break; - case BYTE: - typeDesc = new TypeDesc(TypeDesc.Type.TINYINT); - break; - case SHORT: - typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT); - break; - case INT: - typeDesc = new TypeDesc(TypeDesc.Type.INT); - break; - case LONG: - typeDesc = new TypeDesc(TypeDesc.Type.BIGINT); - break; - case FLOAT: - typeDesc = new TypeDesc(TypeDesc.Type.FLOAT); - break; - case DOUBLE: - typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE); - break; - case STRING: - typeDesc = new TypeDesc(TypeDesc.Type.STRING); - break; - case CHAR: - CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength()); - break; - case VARCHAR: - VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, - varcharTypeInfo.getLength()); - break; - case DATE: - typeDesc = new TypeDesc(TypeDesc.Type.DATE); - break; - case TIMESTAMP: - typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP); - break; - case BINARY: - typeDesc = new TypeDesc(TypeDesc.Type.BINARY); - break; - case DECIMAL: - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, - decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); - break; - default: - throw new HiveException("Unsupported type " + typeString); - } - - return typeDesc; - } - private Schema convertSchema(Object obj) throws HiveException { org.apache.hadoop.hive.metastore.api.Schema schema = (org.apache.hadoop.hive.metastore.api.Schema) obj; List colDescs = new ArrayList(); for (FieldSchema fs : schema.getFieldSchemas()) { String colName = fs.getName(); String typeString = fs.getType(); - TypeDesc typeDesc = convertTypeString(typeString); + TypeDesc typeDesc = TypeDesc.fromTypeString(typeString); colDescs.add(new FieldDesc(colName, typeDesc)); } Schema Schema = new Schema(colDescs);