Index: serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableStringAscSerDe.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableStringAscSerDe.java (revision 0) +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableStringAscSerDe.java (revision 0) @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.binarysortable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * BinarySortableStringAscSerDe can be used to write data in a way that the data can be + * compared byte-by-byte with the same order. This SerDe requires data is one single + * string type and the order is asc. + * + * The data format: NULL: a byte array with length = 0; empty string: a byte array with + * a single 0; normal Text starting with 0: 0 + bytes of the Text; others: the bytes of + * the Text. In this way, we preserve the order, avoid another mem-copy except the first + * byte is 0 and outputs are relatively short. + * + */ +public class BinarySortableStringAscSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(BinarySortableStringAscSerDe.class + .getName()); + + List columnNames; + List columnTypes; + + TypeInfo rowTypeInfo; + StructObjectInspector rowObjectInspector; + + boolean[] columnSortOrderIsDesc; + + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + byte[] nullBuffer = new byte[0]; + nullBytesWritable.set(nullBuffer, 0, 0); + outBuffer[0] = (byte) 0; + + // Get column names and sort order + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + assert (columnNames.size() == 1); + assert (columnNames.size() == columnTypes.size()); + + columnNames = new ArrayList(1); + columnNames.add(columnNameProperty); + + columnTypes = TypeInfoUtils + .getTypeInfosFromTypeString(columnTypeProperty); + + + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + rowObjectInspector = (StructObjectInspector) TypeInfoUtils + .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); + row = new ArrayList(1); + row.add(null); + } + + @Override + public Class getSerializedClass() { + return BytesWritable.class; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return rowObjectInspector; + } + + ArrayList row; + Text reuseText = new Text(); + + @Override + public Object deserialize(Writable blob) throws SerDeException { + BytesWritable data = (BytesWritable) blob; + + byte[] bytes = data.getBytes(); + int length = data.getLength(); + + // Looks like MapReduce framework added a 0xFF to the end. + + if (data.getLength() == 0) { + row.set(0, null); + } else { + if (bytes[0] == 0) { + reuseText.set(bytes, 1, length - 1); + } else { + reuseText.set(bytes, 0, length); + } + row.set(0, reuseText); + } + return row; + } + + byte[] outBuffer = new byte[128]; + BytesWritable serializeBytesWritable = new BytesWritable(); + BytesWritable nullBytesWritable = new BytesWritable(); + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + + assert (fields.size() == 1); + + Object data = soi.getStructFieldData(obj, fields.get(0)); + + int keyLength; + if (data == null) { + return nullBytesWritable; + } else { + ObjectInspector oi = fields.get(0).getFieldObjectInspector(); + assert(oi instanceof StringObjectInspector); + StringObjectInspector toi = (StringObjectInspector) oi; + Text key = toi.getPrimitiveWritableObject(data); + + keyLength = key.getLength(); + byte[] keyBytes = key.getBytes(); + if (keyLength == 0 || key.getBytes()[0] == (byte) 0) { + if (outBuffer.length < keyLength + 1) { + int newBufferSize = outBuffer.length * 2; + while (newBufferSize < key.getLength() + 1) { + newBufferSize *= 2; + } + outBuffer = new byte[newBufferSize]; + } + outBuffer[0] = 0; + System.arraycopy(key.getBytes(), 0, outBuffer, 1, keyLength); + serializeBytesWritable.set(outBuffer, 0, keyLength + 1); + } else { + serializeBytesWritable.set(keyBytes, 0, keyLength); + } + } + + return serializeBytesWritable; + } +} Index: serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableLongAscSerDe.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableLongAscSerDe.java (revision 0) +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableLongAscSerDe.java (revision 0) @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.binarysortable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * BinarySortableLongAscSerDe can be used to write data in a way that the data can be + * compared byte-by-byte with the same order. This SerDe requires data is one single + * long type and the order is asc. + * + * The data format: NULL: a byte array with length = 0; Others: 8 bytes of the long + * object with the signal bit flip. + * + */ +public class BinarySortableLongAscSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(BinarySortableLongAscSerDe.class + .getName()); + + List columnNames; + List columnTypes; + + TypeInfo rowTypeInfo; + StructObjectInspector rowObjectInspector; + + boolean[] columnSortOrderIsDesc; + + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + byte[] nullBuffer = new byte[0]; + nullBytesWritable.set(nullBuffer, 0, 0); + outBuffer[0] = (byte) 0; + + // Get column names and sort order + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + assert (columnNames.size() == 1); + assert (columnNames.size() == columnTypes.size()); + + columnNames = new ArrayList(1); + columnNames.add(columnNameProperty); + + columnTypes = TypeInfoUtils + .getTypeInfosFromTypeString(columnTypeProperty); + + + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + rowObjectInspector = (StructObjectInspector) TypeInfoUtils + .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); + row = new ArrayList(1); + row.add(null); + } + + @Override + public Class getSerializedClass() { + return BytesWritable.class; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return rowObjectInspector; + } + + ArrayList row; + LongWritable reuseLong = new LongWritable(); + + @Override + public Object deserialize(Writable blob) throws SerDeException { + BytesWritable data = (BytesWritable) blob; + + byte[] bytes = data.getBytes(); + int length = data.getLength(); + + if (length == 0) { + row.set(0, null); + } else { + long v = bytes[0] ^ 0x80; + for (int i = 1; i < 8; i++) { + v = (v << 8) + (bytes[i] & 0xff); + } + reuseLong.set(v); + row.set(0, reuseLong); + } + return row; + } + + byte[] outBuffer = new byte[8]; + BytesWritable serializeBytesWritable = new BytesWritable(); + BytesWritable nullBytesWritable = new BytesWritable(); + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + + assert (fields.size() == 1); + + Object data = soi.getStructFieldData(obj, fields.get(0)); + + if (data == null) { + return nullBytesWritable; + } else { + ObjectInspector oi = fields.get(0).getFieldObjectInspector(); + assert(oi instanceof LongObjectInspector); + LongObjectInspector loi = (LongObjectInspector) oi; + long v = loi.get(data); + + outBuffer[0] = (byte) ((v >> 56) ^ 0x80); + outBuffer[1] = (byte) (v >> 48); + outBuffer[2] = (byte) (v >> 40); + outBuffer[3] = (byte) (v >> 32); + outBuffer[4] = (byte) (v >> 24); + outBuffer[5] = (byte) (v >> 16); + outBuffer[6] = (byte) (v >> 8); + outBuffer[7] = (byte) v; + + serializeBytesWritable.set(outBuffer, 0, 8); + } + + return serializeBytesWritable; + } +} Index: ql/src/test/results/clientpositive/join_bigint.q.out =================================================================== --- ql/src/test/results/clientpositive/join_bigint.q.out (revision 0) +++ ql/src/test/results/clientpositive/join_bigint.q.out (revision 0) @@ -0,0 +1,374 @@ +PREHOOK: query: drop table join_long_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table join_long_1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE join_long_1(key1 bigint, key2 bigint, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE join_long_1(key1 bigint, key2 bigint, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@join_long_1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/in7.txt' INTO TABLE join_long_1 +PREHOOK: type: LOAD +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/in7.txt' INTO TABLE join_long_1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@join_long_1 +PREHOOK: query: SELECT key1, count(key1), count(1), avg(key1), sum(key2) from join_long_1 group by key1 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-44_080_6687839933448529793/-mr-10000 +POSTHOOK: query: SELECT key1, count(key1), count(1), avg(key1), sum(key2) from join_long_1 group by key1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-44_080_6687839933448529793/-mr-10000 +NULL 0 3 NULL 100 +-8000000000 2 2 -8.0E9 8000000040 +-7000000000 2 2 -7.0E9 8000000040 +10 1 1 10.0 10 +30 1 1 30.0 30 +40 2 2 40.0 80 +50 3 3 50.0 150 +60 2 2 60.0 80 +80 2 2 80.0 80 +7000000000 2 2 7.0E9 80 +PREHOOK: query: SELECT key1, key2, count(1) from join_long_1 group by key1, key2 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-47_084_5970249462607701546/-mr-10000 +POSTHOOK: query: SELECT key1, key2, count(1) from join_long_1 group by key1, key2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-47_084_5970249462607701546/-mr-10000 +NULL NULL 1 +NULL 50 2 +-8000000000 40 1 +-8000000000 8000000000 1 +-7000000000 40 1 +-7000000000 8000000000 1 +10 10 1 +30 30 1 +40 40 2 +50 50 3 +60 40 2 +80 40 2 +7000000000 40 2 +PREHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key1 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-49_704_2423888710872826499/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-49_704_2423888710872826499/-mr-10000 +-8000000000 40 88 -8000000000 40 88 +-8000000000 40 88 -8000000000 8000000000 88 +-8000000000 8000000000 88 -8000000000 40 88 +-8000000000 8000000000 88 -8000000000 8000000000 88 +-7000000000 40 88 -7000000000 40 88 +-7000000000 40 88 -7000000000 8000000000 88 +-7000000000 8000000000 88 -7000000000 40 88 +-7000000000 8000000000 88 -7000000000 8000000000 88 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 66 40 40 88 +40 40 66 40 40 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 88 50 50 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +PREHOOK: query: SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key1 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-52_404_2538499380194485415/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-52_404_2538499380194485415/-mr-10000 +NULL NULL 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +NULL NULL NULL NULL NULL 66 +NULL NULL NULL NULL 50 66 +NULL NULL NULL NULL 50 66 +-8000000000 40 88 -8000000000 40 88 +-8000000000 40 88 -8000000000 8000000000 88 +-8000000000 8000000000 88 -8000000000 40 88 +-8000000000 8000000000 88 -8000000000 8000000000 88 +-7000000000 40 88 -7000000000 40 88 +-7000000000 40 88 -7000000000 8000000000 88 +-7000000000 8000000000 88 -7000000000 40 88 +-7000000000 8000000000 88 -7000000000 8000000000 88 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 66 40 40 88 +40 40 66 40 40 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 88 50 50 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +PREHOOK: query: SELECT * FROM join_long_1 a left outer join join_long_1 b on a.key1 = b.key1 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-55_959_736506743560899851/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a left outer join join_long_1 b on a.key1 = b.key1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-55_959_736506743560899851/-mr-10000 +NULL NULL 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +-8000000000 40 88 -8000000000 40 88 +-8000000000 40 88 -8000000000 8000000000 88 +-8000000000 8000000000 88 -8000000000 40 88 +-8000000000 8000000000 88 -8000000000 8000000000 88 +-7000000000 40 88 -7000000000 40 88 +-7000000000 40 88 -7000000000 8000000000 88 +-7000000000 8000000000 88 -7000000000 40 88 +-7000000000 8000000000 88 -7000000000 8000000000 88 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 66 40 40 88 +40 40 66 40 40 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 88 50 50 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +PREHOOK: query: SELECT * FROM join_long_1 a right outer join join_long_1 b on a.key1 = b.key1 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-58_608_5841980130753701529/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a right outer join join_long_1 b on a.key1 = b.key1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-19-58_608_5841980130753701529/-mr-10000 +NULL NULL NULL NULL NULL 66 +NULL NULL NULL NULL 50 66 +NULL NULL NULL NULL 50 66 +-8000000000 40 88 -8000000000 40 88 +-8000000000 40 88 -8000000000 8000000000 88 +-8000000000 8000000000 88 -8000000000 40 88 +-8000000000 8000000000 88 -8000000000 8000000000 88 +-7000000000 40 88 -7000000000 40 88 +-7000000000 40 88 -7000000000 8000000000 88 +-7000000000 8000000000 88 -7000000000 40 88 +-7000000000 8000000000 88 -7000000000 8000000000 88 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 66 40 40 88 +40 40 66 40 40 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 88 50 50 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +60 40 66 60 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +80 40 66 80 40 66 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +7000000000 40 88 7000000000 40 88 +PREHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-01_182_260018714794157578/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-01_182_260018714794157578/-mr-10000 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 66 40 40 88 +40 40 66 40 40 66 +40 40 66 60 40 66 +40 40 66 -7000000000 40 88 +40 40 66 -8000000000 40 88 +40 40 66 7000000000 40 88 +40 40 66 7000000000 40 88 +40 40 66 80 40 66 +40 40 66 80 40 66 +40 40 66 60 40 66 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 88 60 40 66 +40 40 88 -7000000000 40 88 +40 40 88 -8000000000 40 88 +40 40 88 7000000000 40 88 +40 40 88 7000000000 40 88 +40 40 88 80 40 66 +40 40 88 80 40 66 +40 40 88 60 40 66 +50 50 66 NULL 50 66 +50 50 66 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 88 NULL 50 66 +50 50 88 50 50 66 +50 50 88 NULL 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +PREHOOK: query: SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key2 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-10_361_8104781671953740368/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-10_361_8104781671953740368/-mr-10000 +NULL NULL 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +NULL 50 66 NULL NULL NULL +NULL NULL NULL NULL NULL 66 +-8000000000 40 88 NULL NULL NULL +-8000000000 8000000000 88 NULL NULL NULL +-7000000000 8000000000 88 NULL NULL NULL +-7000000000 40 88 NULL NULL NULL +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 66 40 40 88 +40 40 66 40 40 66 +40 40 66 60 40 66 +40 40 66 -7000000000 40 88 +40 40 66 -8000000000 40 88 +40 40 66 7000000000 40 88 +40 40 66 7000000000 40 88 +40 40 66 80 40 66 +40 40 66 80 40 66 +40 40 66 60 40 66 +40 40 88 40 40 88 +40 40 88 40 40 66 +40 40 88 60 40 66 +40 40 88 -7000000000 40 88 +40 40 88 -8000000000 40 88 +40 40 88 7000000000 40 88 +40 40 88 7000000000 40 88 +40 40 88 80 40 66 +40 40 88 80 40 66 +40 40 88 60 40 66 +50 50 66 NULL 50 66 +50 50 66 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 88 NULL 50 66 +50 50 88 50 50 66 +50 50 88 NULL 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 66 +50 50 66 NULL 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +60 40 66 NULL NULL NULL +60 40 66 NULL NULL NULL +80 40 66 NULL NULL NULL +80 40 66 NULL NULL NULL +7000000000 40 88 NULL NULL NULL +7000000000 40 88 NULL NULL NULL +NULL NULL NULL -8000000000 8000000000 88 +NULL NULL NULL -7000000000 8000000000 88 +PREHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2 and b.key1 = b.key2 +PREHOOK: type: QUERY +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-13_000_3073460877272633086/-mr-10000 +POSTHOOK: query: SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2 and b.key1 = b.key2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: file:/tmp/sdong/hive_2010-11-19_12-20-13_000_3073460877272633086/-mr-10000 +10 10 66 10 10 66 +30 30 88 30 30 88 +40 40 66 40 40 88 +40 40 66 40 40 66 +40 40 88 40 40 88 +40 40 88 40 40 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 66 50 50 66 +50 50 66 50 50 88 +50 50 66 50 50 66 +50 50 88 50 50 66 +50 50 88 50 50 88 +50 50 88 50 50 66 +PREHOOK: query: drop table join_long_1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@join_long_1 +PREHOOK: Output: default@join_long_1 +POSTHOOK: query: drop table join_long_1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@join_long_1 +POSTHOOK: Output: default@join_long_1 Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 10406) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -29,9 +29,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; @@ -47,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDescFactory; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; @@ -204,10 +204,10 @@ + "mapplan2.out", Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(new ScriptDesc("/bin/cat", - PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), - TextRecordWriter.class, PlanUtils.getDefaultTableDesc("" + TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), + TextRecordWriter.class, TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), TextRecordReader.class, - TextRecordReader.class, PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key")), op3); + TextRecordReader.class, TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key")), op3); Operator op1 = OperatorFactory.get(getTestFilterDesc("key"), op2); @@ -224,7 +224,7 @@ outputColumns.add("_col" + i); } // map-side work - Operator op1 = OperatorFactory.get(PlanUtils + Operator op1 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")), Utilities.makeList(getStringColumn("value")), outputColumns, true, -1, 1, -1)); @@ -251,7 +251,7 @@ outputColumns.add("_col" + i); } // map-side work - Operator op1 = OperatorFactory.get(PlanUtils + Operator op1 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")), Utilities .makeList(getStringColumn("key"), getStringColumn("value")), @@ -285,7 +285,7 @@ outputColumns.add("_col" + i); } // map-side work - Operator op1 = OperatorFactory.get(PlanUtils + Operator op1 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")), Utilities.makeList(getStringColumn("value")), outputColumns, true, Byte.valueOf((byte) 0), 1, -1)); @@ -294,7 +294,7 @@ mr.setKeyDesc(op1.getConf().getKeySerializeInfo()); mr.getTagToValueDesc().add(op1.getConf().getValueSerializeInfo()); - Operator op2 = OperatorFactory.get(PlanUtils + Operator op2 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")), Utilities.makeList(getStringColumn("key")), outputColumns, true, Byte.valueOf((byte) 1), Integer.MAX_VALUE, -1)); @@ -326,16 +326,16 @@ for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); } - Operator op1 = OperatorFactory.get(PlanUtils + Operator op1 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")), Utilities.makeList(getStringColumn("tkey"), getStringColumn("tvalue")), outputColumns, false, -1, 1, -1)); Operator op0 = OperatorFactory.get(new ScriptDesc("/bin/cat", - PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), - TextRecordWriter.class, PlanUtils.getDefaultTableDesc("" + TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), + TextRecordWriter.class, TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "tkey,tvalue"), TextRecordReader.class, - TextRecordReader.class, PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key")), op1); + TextRecordReader.class, TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key")), op1); Operator op4 = OperatorFactory.get(new SelectDesc(Utilities .makeList(getStringColumn("key"), getStringColumn("value")), @@ -369,7 +369,7 @@ for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); } - Operator op0 = OperatorFactory.get(PlanUtils + Operator op0 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("0")), Utilities .makeList(getStringColumn("0"), getStringColumn("1")), outputColumns, false, -1, 1, -1)); @@ -401,17 +401,17 @@ for (int i = 0; i < 2; i++) { outputColumns.add("_col" + i); } - Operator op1 = OperatorFactory.get(PlanUtils + Operator op1 = OperatorFactory.get(ReduceSinkDescFactory .getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")), Utilities.makeList(getStringColumn("tkey"), getStringColumn("tvalue")), outputColumns, false, -1, 1, -1)); Operator op0 = OperatorFactory.get(new ScriptDesc( - "\'/bin/cat\'", PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, - "tkey,tvalue"), TextRecordWriter.class, PlanUtils + "\'/bin/cat\'", TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, + "tkey,tvalue"), TextRecordWriter.class, TableDescFactory .getDefaultTableDesc("" + Utilities.tabCode, "tkey,tvalue"), TextRecordReader.class, - TextRecordReader.class, PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key")), op1); + TextRecordReader.class, TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key")), op1); Operator op4 = OperatorFactory.get(new SelectDesc(Utilities .makeList(getStringColumn("key"), getStringColumn("value")), Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (revision 10406) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (working copy) @@ -35,10 +35,10 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -210,14 +210,14 @@ op.setConf(selectCtx); // scriptOperator to echo the output of the select - TableDesc scriptOutput = PlanUtils.getDefaultTableDesc("" + TableDesc scriptOutput = TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "a,b"); - TableDesc scriptInput = PlanUtils.getDefaultTableDesc("" + TableDesc scriptInput = TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "a,b"); ScriptDesc sd = new ScriptDesc("cat", scriptOutput, TextRecordWriter.class, scriptInput, TextRecordReader.class, TextRecordReader.class, - PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key")); + TableDescFactory.getDefaultTableDesc("" + Utilities.tabCode, "key")); Operator sop = OperatorFactory.getAndMakeChild(sd, op); // Collect operator to observe the output of the script Index: ql/src/test/queries/clientpositive/join_bigint.q =================================================================== --- ql/src/test/queries/clientpositive/join_bigint.q (revision 0) +++ ql/src/test/queries/clientpositive/join_bigint.q (revision 0) @@ -0,0 +1,20 @@ +drop table join_long_1; + +CREATE TABLE join_long_1(key1 bigint, key2 bigint, value string); +LOAD DATA LOCAL INPATH '../data/files/in7.txt' INTO TABLE join_long_1; + +SELECT key1, count(key1), count(1), avg(key1), sum(key2) from join_long_1 group by key1; +SELECT key1, key2, count(1) from join_long_1 group by key1, key2; + + +SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key1; +SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key1; +SELECT * FROM join_long_1 a left outer join join_long_1 b on a.key1 = b.key1; +SELECT * FROM join_long_1 a right outer join join_long_1 b on a.key1 = b.key1; + +SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2; +SELECT * FROM join_long_1 a full outer join join_long_1 b on a.key1 = b.key2; + +SELECT * FROM join_long_1 a join join_long_1 b on a.key1 = b.key2 and b.key1 = b.key2; + +drop table join_long_1; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (working copy) @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.Task; @@ -51,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; /** * Processor for the rule - TableScan followed by Union. @@ -159,7 +159,7 @@ plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp()); } - TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + TableDesc tt_desc = TableDescFactory.getIntermediateFileTableDesc(PlanUtils .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); // generate the temporary file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy) @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.ql.plan.TableScanDesc; /** @@ -511,7 +512,7 @@ reduce.getSchema().setSignature(sig); reduceConf.setOutputValueColumnNames(newOutputColNames); reduceConf.setValueCols(newValueEval); - TableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils + TableDesc newValueTable = TableDescFactory.getReduceValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(reduceConf.getValueCols(), newOutputColNames, 0, "")); reduceConf.setValueSerializeInfo(newValueTable); @@ -656,7 +657,7 @@ keyOrder.append("+"); } - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils + TableDesc valueTableDesc = TableDescFactory.getMapJoinValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); valueTableDescs.add(valueTableDesc); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; -import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -66,11 +65,12 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDescFactory; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** @@ -208,7 +208,7 @@ outputColumns.add(SemanticAnalyzer.getColumnInternalName(i)); } - ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc( + ReduceSinkDesc rsDesc = ReduceSinkDescFactory.getReduceSinkDesc( new ArrayList(), valueCols, outputColumns, false, -1, -1, -1); OperatorFactory.getAndMakeChild(rsDesc, inputRS, tsMerge); @@ -246,7 +246,7 @@ // NOTE: we should gather stats in MR1 (rather than the merge MR job) // since it is unknown if the merge MR will be triggered at execution time. - + MoveWork dummyMv = new MoveWork(null, null, null, new LoadFileDesc(fsConf.getDirName(), finalName, true, null, null), false); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; @@ -824,7 +825,7 @@ String taskTmpDir = baseCtx.getMRTmpFileURI(); Operator parent = op.getParentOperators().get(posn); - TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + TableDesc tt_desc = TableDescFactory.getIntermediateFileTableDesc(PlanUtils .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); // Create a file sink operator for this file name Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; /** * Operator factory for MapJoin processing. @@ -215,7 +216,7 @@ Task mjTask = TaskFactory.get(mjPlan, parseCtx .getConf()); - TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + TableDesc tt_desc = TableDescFactory.getIntermediateFileTableDesc(PlanUtils .getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol")); // generate the temporary file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; /** * Node processor factory for skew join resolver. @@ -120,7 +121,7 @@ }else{ //get parent schema RowSchema rowSchema = parent.getSchema(); - tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils + tbl = TableDescFactory.getIntermediateFileTableDesc(PlanUtils .getFieldSchemasFromRowSchema(rowSchema, "")); } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -407,7 +408,7 @@ keyOrder.append("+"); } - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils + TableDesc keyTableDesc = TableDescFactory.getMapJoinKeyTableDesc(PlanUtils .getFieldSchemasFromColumnList(keyCols, "mapjoinkey")); List valueTableDescs = new ArrayList(); @@ -435,9 +436,9 @@ keyOrder.append("+"); } - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils + TableDesc valueTableDesc = TableDescFactory.getMapJoinValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); - TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils + TableDesc valueFilteredTableDesc = TableDescFactory.getMapJoinValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); valueTableDescs.add(valueTableDesc); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -76,7 +76,7 @@ transient Serializer keySerializer; transient boolean keyIsText; transient Serializer valueSerializer; - transient int tag; + transient int encodeTag; transient byte[] tagByte = new byte[1]; transient protected int numDistributionKeys; transient protected int numDistinctExprs; @@ -107,9 +107,9 @@ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); } - tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); + encodeTag = conf.getTag(); + tagByte[0] = (byte) encodeTag; + LOG.info("Using tag = " + encodeTag); TableDesc keyTableDesc = conf.getKeySerializeInfo(); keySerializer = (Serializer) keyTableDesc.getDeserializerClass() @@ -263,7 +263,7 @@ if (keyIsText) { Text key = (Text) keySerializer.serialize(cachedKeys[i], keyObjectInspector); - if (tag == -1) { + if (this.encodeTag == -1) { keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); @@ -275,7 +275,7 @@ // Must be BytesWritable BytesWritable key = (BytesWritable) keySerializer.serialize( cachedKeys[i], keyObjectInspector); - if (tag == -1) { + if (this.encodeTag == -1) { keyWritable.set(key.getBytes(), 0, key.getLength()); } else { int keyLength = key.getLength(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -94,8 +94,8 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -505,7 +505,7 @@ // PlanUtils.getDefaultTableDesc(String separatorCode, String columns) // or getBinarySortableTableDesc(List fieldSchemas) when // we know the column names. - defaultTd = PlanUtils.getDefaultTableDesc("" + Utilities.ctrlaCode); + defaultTd = TableDescFactory.getDefaultTableDesc("" + Utilities.ctrlaCode); } public static final int newLineCode = 10; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDescFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDescFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDescFactory.java (revision 0) @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.ReduceKeyTableDescFactory.SortableSerdeType; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +public class ReduceSinkDescFactory { + + + /** + * Create the reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param valueCols + * The columns to be stored in the value + * @param outputColumnNames + * The output columns names + * @param tag + * The tag for this reducesink + * @param partitionCols + * The columns for partitioning. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The reduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, ArrayList valueCols, + List outputColumnNames, boolean includeKeyCols, int tag, + ArrayList partitionCols, String order, int numReducers) { + return getReduceSinkDesc(keyCols, keyCols.size(), valueCols, + new ArrayList>(), + includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) : + new ArrayList(), + includeKeyCols ? outputColumnNames.subList(keyCols.size(), + outputColumnNames.size()) : outputColumnNames, + includeKeyCols, tag, partitionCols, order, numReducers); + } + + /** + * Create the reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys + * number of distribution key numbers. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregate parameters + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param partitionCols + * The columns for partitioning. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The reduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + final ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, + List outputValueColumnNames, + boolean includeKeyCols, int tag, + ArrayList partitionCols, String order, int numReducers) { + TableDesc keyTable = null; + TableDesc valueTable = null; + ArrayList outputKeyCols = new ArrayList(); + ArrayList outputValCols = new ArrayList(); + + SortableSerdeType serdeType; + if (keyCols.size() == 1 && keyCols.get(0).getTypeInfo().equals(TypeInfoFactory.stringTypeInfo) + && distinctColIndices.size() == 0 && (order.isEmpty() || order.charAt(0) == '+')) { + serdeType = SortableSerdeType.SINGLE_ASC_STRING; + } else if (keyCols.size() == 1 && keyCols.get(0).getTypeInfo().equals(TypeInfoFactory.longTypeInfo) + && distinctColIndices.size() == 0 && (order.isEmpty() || order.charAt(0) == '+')){ + serdeType = SortableSerdeType.SINGLE_ASC_LONG; + } else { + serdeType = SortableSerdeType.GENERAL; + } + if (includeKeyCols) { + keyTable = ReduceKeyTableDescFactory.getReduceKeyTableDesc(PlanUtils + .getFieldSchemasFromColumnListWithLength( + keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), + order, serdeType); + outputKeyCols.addAll(outputKeyColumnNames); + } else { + keyTable = ReduceKeyTableDescFactory.getReduceKeyTableDesc(PlanUtils.getFieldSchemasFromColumnList( + keyCols, "reducesinkkey"), order, serdeType); + for (int i = 0; i < keyCols.size(); i++) { + outputKeyCols.add("reducesinkkey" + i); + } + } + valueTable = TableDescFactory.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueCols, outputValueColumnNames, 0, "")); + outputValCols.addAll(outputValueColumnNames); + return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, + distinctColIndices, outputValCols, + tag, partitionCols, numReducers, keyTable, + valueTable); + } + + /** + * Create the reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param valueCols + * The columns to be stored in the value + * @param outputColumnNames + * The output columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The reduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, ArrayList valueCols, + List outputColumnNames, boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + return getReduceSinkDesc(keyCols, keyCols.size(), valueCols, + new ArrayList>(), + includeKey ? outputColumnNames.subList(0, keyCols.size()) : + new ArrayList(), + includeKey ? + outputColumnNames.subList(keyCols.size(), outputColumnNames.size()) + : outputColumnNames, + includeKey, tag, numPartitionFields, numReducers); + } + + /** + * Create the reduce sink descriptor. + * + * @param keyCols + * The columns to be stored in the key + * @param numKeys number of distribution keys. Equals to group-by-key + * numbers usually. + * @param valueCols + * The columns to be stored in the value + * @param distinctColIndices + * column indices for distinct aggregates + * @param outputKeyColumnNames + * The output key columns names + * @param outputValueColumnNames + * The output value columns names + * @param tag + * The tag for this reducesink + * @param numPartitionFields + * The first numPartitionFields of keyCols will be partition columns. + * If numPartitionFields=-1, then partition randomly. + * @param numReducers + * The number of reducers, set to -1 for automatic inference based on + * input data size. + * @return The reduceSinkDesc object. + */ + public static ReduceSinkDesc getReduceSinkDesc( + ArrayList keyCols, int numKeys, + ArrayList valueCols, + List> distinctColIndices, + List outputKeyColumnNames, List outputValueColumnNames, + boolean includeKey, int tag, + int numPartitionFields, int numReducers) throws SemanticException { + ArrayList partitionCols = null; + + if (numPartitionFields >= keyCols.size()) { + partitionCols = keyCols; + } else if (numPartitionFields >= 0) { + partitionCols = new ArrayList(numPartitionFields); + for (int i = 0; i < numPartitionFields; i++) { + partitionCols.add(keyCols.get(i)); + } + } else { + // numPartitionFields = -1 means random partitioning + partitionCols = new ArrayList(1); + partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor + .getFuncExprNodeDesc("rand")); + } + + StringBuilder order = new StringBuilder(); + for (int i = 0; i < keyCols.size(); i++) { + order.append("+"); + } + return getReduceSinkDesc(keyCols, numKeys, valueCols, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, includeKey, tag, + partitionCols, order.toString(), numReducers); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceKeyTableDescFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceKeyTableDescFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceKeyTableDescFactory.java (revision 0) @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableLongAscSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableStringAscSerDe; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; + +public class ReduceKeyTableDescFactory { + public static enum SortableSerdeType { + GENERAL, SINGLE_ASC_STRING, SINGLE_ASC_LONG + } + + /** + * Generate the table descriptor for reduce key. + * @throws SemanticException + */ + public static TableDesc getReduceKeyTableDesc(List fieldSchemas, + String order, SortableSerdeType serdeType) { + Properties properties = Utilities.makeProperties(Constants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + Constants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + Constants.SERIALIZATION_SORT_ORDER, order); + switch (serdeType) { + case GENERAL: + return new TableDesc(BinarySortableSerDe.class, + SequenceFileInputFormat.class, SequenceFileOutputFormat.class, + properties); + case SINGLE_ASC_STRING: + return new TableDesc(BinarySortableStringAscSerDe.class, + SequenceFileInputFormat.class, SequenceFileOutputFormat.class, + properties); + case SINGLE_ASC_LONG: + return new TableDesc(BinarySortableLongAscSerDe.class, + SequenceFileInputFormat.class, SequenceFileOutputFormat.class, + properties); + default: + assert (false); + return null; + } + } + + /** + * Genrate table description for reduce key + * @param key columns + * @param field prefix + * @param order + * @return + * @throws SemanticException + */ + public static TableDesc getReduceKeyTableDesc( + ArrayList keyCols, + String fieldPrefix, + String order) throws SemanticException { + SortableSerdeType serdeType; + if (keyCols.size() == 1 && keyCols.get(0).getTypeInfo().equals(TypeInfoFactory.stringTypeInfo) + && (order.isEmpty() || order.charAt(0) == '+')) { + serdeType = SortableSerdeType.SINGLE_ASC_STRING; + } else if (keyCols.size() == 1 && keyCols.get(0).getTypeInfo().equals(TypeInfoFactory.longTypeInfo) + && (order.isEmpty() || order.charAt(0) == '+')) { + serdeType = SortableSerdeType.SINGLE_ASC_LONG; + } else { + serdeType = SortableSerdeType.GENERAL; + } + + return ReduceKeyTableDescFactory.getReduceKeyTableDesc(PlanUtils + .getFieldSchemasFromColumnList(keyCols, fieldPrefix), order, serdeType); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (working copy) @@ -25,7 +25,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,31 +34,12 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; /** * PlanUtils. @@ -77,6 +57,7 @@ FIELD, JEXL }; + @SuppressWarnings("nls") public static MapredWork getMapRedWork() { try { @@ -92,272 +73,20 @@ } /** - * Generate the table descriptor of MetadataTypedColumnsetSerDe with the - * separatorCode and column names (comma separated string). + * Convert the ColumnList to FieldSchema list. */ - public static TableDesc getDefaultTableDesc(String separatorCode, - String columns) { - return getDefaultTableDesc(separatorCode, columns, false); - } - - /** - * Generate the table descriptor of given serde with the separatorCode and - * column names (comma separated string). - */ - public static TableDesc getTableDesc( - Class serdeClass, String separatorCode, - String columns) { - return getTableDesc(serdeClass, separatorCode, columns, false); - } - - /** - * Generate the table descriptor of MetadataTypedColumnsetSerDe with the - * separatorCode and column names (comma separated string), and whether the - * last column should take the rest of the line. - */ - public static TableDesc getDefaultTableDesc(String separatorCode, - String columns, boolean lastColumnTakesRestOfTheLine) { - return getDefaultTableDesc(separatorCode, columns, null, - lastColumnTakesRestOfTheLine); - } - - /** - * Generate the table descriptor of the serde specified with the separatorCode - * and column names (comma separated string), and whether the last column - * should take the rest of the line. - */ - public static TableDesc getTableDesc( - Class serdeClass, String separatorCode, - String columns, boolean lastColumnTakesRestOfTheLine) { - return getTableDesc(serdeClass, separatorCode, columns, null, - lastColumnTakesRestOfTheLine); - } - - /** - * Generate the table descriptor of MetadataTypedColumnsetSerDe with the - * separatorCode and column names (comma separated string), and whether the - * last column should take the rest of the line. - */ - public static TableDesc getDefaultTableDesc(String separatorCode, - String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine) { - return getTableDesc(LazySimpleSerDe.class, separatorCode, columns, - columnTypes, lastColumnTakesRestOfTheLine); - } - - public static TableDesc getTableDesc( - Class serdeClass, String separatorCode, - String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine) { - return getTableDesc(serdeClass, separatorCode, columns, columnTypes, - lastColumnTakesRestOfTheLine, false); - } - - public static TableDesc getTableDesc( - Class serdeClass, String separatorCode, - String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine, - boolean useDelimitedJSON) { - - return getTableDesc(serdeClass, separatorCode, columns, columnTypes, - lastColumnTakesRestOfTheLine, useDelimitedJSON, "TextFile"); - } - - public static TableDesc getTableDesc( - Class serdeClass, String separatorCode, - String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine, - boolean useDelimitedJSON, String fileFormat) { - - Properties properties = Utilities.makeProperties( - Constants.SERIALIZATION_FORMAT, separatorCode, Constants.LIST_COLUMNS, - columns); - - if (!separatorCode.equals(Integer.toString(Utilities.ctrlaCode))) { - properties.setProperty(Constants.FIELD_DELIM, separatorCode); + public static List getFieldSchemasFromColumnList( + List cols, List outputColumnNames, int start, + String fieldPrefix) { + List schemas = new ArrayList(cols.size()); + for (int i = 0; i < cols.size(); i++) { + schemas.add(MetaStoreUtils.getFieldSchemaFromTypeInfo(fieldPrefix + + outputColumnNames.get(i + start), cols.get(i).getTypeInfo())); } - - if (columnTypes != null) { - properties.setProperty(Constants.LIST_COLUMN_TYPES, columnTypes); - } - - if (lastColumnTakesRestOfTheLine) { - properties.setProperty(Constants.SERIALIZATION_LAST_COLUMN_TAKES_REST, - "true"); - } - - // It is not a very clean way, and should be modified later - due to - // compatiblity reasons, - // user sees the results as json for custom scripts and has no way for - // specifying that. - // Right now, it is hard-coded in the code - if (useDelimitedJSON) { - serdeClass = DelimitedJSONSerDe.class; - } - - Class inputFormat, outputFormat; - // get the input & output file formats - if ("SequenceFile".equalsIgnoreCase(fileFormat)) { - inputFormat = SequenceFileInputFormat.class; - outputFormat = SequenceFileOutputFormat.class; - } else if ("RCFile".equalsIgnoreCase(fileFormat)) { - inputFormat = RCFileInputFormat.class; - outputFormat = RCFileOutputFormat.class; - assert serdeClass == ColumnarSerDe.class; - } else { // use TextFile by default - inputFormat = TextInputFormat.class; - outputFormat = IgnoreKeyTextOutputFormat.class; - } - return new TableDesc(serdeClass, inputFormat, outputFormat, properties); + return schemas; } - public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes, - String fileFormat) { - return getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes, - false, false, fileFormat); - } - - /** - * Generate a table descriptor from a createTableDesc. - */ - public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols, - String colTypes) { - - Class serdeClass = LazySimpleSerDe.class; - String separatorCode = Integer.toString(Utilities.ctrlaCode); - String columns = cols; - String columnTypes = colTypes; - boolean lastColumnTakesRestOfTheLine = false; - TableDesc ret; - - try { - if (crtTblDesc.getSerName() != null) { - Class c = Class.forName(crtTblDesc.getSerName()); - serdeClass = c; - } - - if (crtTblDesc.getFieldDelim() != null) { - separatorCode = crtTblDesc.getFieldDelim(); - } - - ret = getTableDesc(serdeClass, separatorCode, columns, columnTypes, - lastColumnTakesRestOfTheLine, false); - - // set other table properties - Properties properties = ret.getProperties(); - - if (crtTblDesc.getCollItemDelim() != null) { - properties.setProperty(Constants.COLLECTION_DELIM, crtTblDesc - .getCollItemDelim()); - } - - if (crtTblDesc.getMapKeyDelim() != null) { - properties.setProperty(Constants.MAPKEY_DELIM, crtTblDesc - .getMapKeyDelim()); - } - - if (crtTblDesc.getFieldEscape() != null) { - properties.setProperty(Constants.ESCAPE_CHAR, crtTblDesc - .getFieldEscape()); - } - - if (crtTblDesc.getLineDelim() != null) { - properties.setProperty(Constants.LINE_DELIM, crtTblDesc.getLineDelim()); - } - - // replace the default input & output file format with those found in - // crtTblDesc - Class c1 = Class.forName(crtTblDesc.getInputFormat()); - Class c2 = Class.forName(crtTblDesc.getOutputFormat()); - Class in_class = c1; - Class out_class = c2; - - ret.setInputFileFormatClass(in_class); - ret.setOutputFileFormatClass(out_class); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - return null; - } - return ret; - } - /** - * Generate the table descriptor of MetadataTypedColumnsetSerDe with the - * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe - * does not support a table with a single column "col" with type - * "array". - */ - public static TableDesc getDefaultTableDesc(String separatorCode) { - return new TableDesc(MetadataTypedColumnsetSerDe.class, - TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities - .makeProperties( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, - separatorCode)); - } - - /** - * Generate the table descriptor for reduce key. - */ - public static TableDesc getReduceKeyTableDesc(List fieldSchemas, - String order) { - return new TableDesc(BinarySortableSerDe.class, - SequenceFileInputFormat.class, SequenceFileOutputFormat.class, - Utilities.makeProperties(Constants.LIST_COLUMNS, MetaStoreUtils - .getColumnNamesFromFieldSchema(fieldSchemas), - Constants.LIST_COLUMN_TYPES, MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - Constants.SERIALIZATION_SORT_ORDER, order)); - } - - /** - * Generate the table descriptor for Map-side join key. - */ - public static TableDesc getMapJoinKeyTableDesc(List fieldSchemas) { - return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - Constants.ESCAPE_CHAR, "\\")); - } - - /** - * Generate the table descriptor for Map-side join key. - */ - public static TableDesc getMapJoinValueTableDesc( - List fieldSchemas) { - return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties("columns", - MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), - "columns.types", MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - Constants.ESCAPE_CHAR, "\\")); - } - - /** - * Generate the table descriptor for intermediate files. - */ - public static TableDesc getIntermediateFileTableDesc( - List fieldSchemas) { - return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties( - Constants.LIST_COLUMNS, MetaStoreUtils - .getColumnNamesFromFieldSchema(fieldSchemas), - Constants.LIST_COLUMN_TYPES, MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - Constants.ESCAPE_CHAR, "\\")); - } - - /** - * Generate the table descriptor for intermediate files. - */ - public static TableDesc getReduceValueTableDesc(List fieldSchemas) { - return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, - SequenceFileOutputFormat.class, Utilities.makeProperties( - Constants.LIST_COLUMNS, MetaStoreUtils - .getColumnNamesFromFieldSchema(fieldSchemas), - Constants.LIST_COLUMN_TYPES, MetaStoreUtils - .getColumnTypesFromFieldSchema(fieldSchemas), - Constants.ESCAPE_CHAR, "\\")); - } - - /** * Convert the ColumnList to FieldSchema list. * * Adds uniontype for distinctColIndices. @@ -398,20 +127,6 @@ * Convert the ColumnList to FieldSchema list. */ public static List getFieldSchemasFromColumnList( - List cols, List outputColumnNames, int start, - String fieldPrefix) { - List schemas = new ArrayList(cols.size()); - for (int i = 0; i < cols.size(); i++) { - schemas.add(MetaStoreUtils.getFieldSchemaFromTypeInfo(fieldPrefix - + outputColumnNames.get(i + start), cols.get(i).getTypeInfo())); - } - return schemas; - } - - /** - * Convert the ColumnList to FieldSchema list. - */ - public static List getFieldSchemasFromColumnList( List cols, String fieldPrefix) { List schemas = new ArrayList(cols.size()); for (int i = 0; i < cols.size(); i++) { @@ -464,185 +179,6 @@ } /** - * Create the reduce sink descriptor. - * - * @param keyCols - * The columns to be stored in the key - * @param valueCols - * The columns to be stored in the value - * @param outputColumnNames - * The output columns names - * @param tag - * The tag for this reducesink - * @param partitionCols - * The columns for partitioning. - * @param numReducers - * The number of reducers, set to -1 for automatic inference based on - * input data size. - * @return The reduceSinkDesc object. - */ - public static ReduceSinkDesc getReduceSinkDesc( - ArrayList keyCols, ArrayList valueCols, - List outputColumnNames, boolean includeKeyCols, int tag, - ArrayList partitionCols, String order, int numReducers) { - return getReduceSinkDesc(keyCols, keyCols.size(), valueCols, - new ArrayList>(), - includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) : - new ArrayList(), - includeKeyCols ? outputColumnNames.subList(keyCols.size(), - outputColumnNames.size()) : outputColumnNames, - includeKeyCols, tag, partitionCols, order, numReducers); - } - - /** - * Create the reduce sink descriptor. - * - * @param keyCols - * The columns to be stored in the key - * @param numKeys - * number of distribution key numbers. Equals to group-by-key - * numbers usually. - * @param valueCols - * The columns to be stored in the value - * @param distinctColIndices - * column indices for distinct aggregate parameters - * @param outputKeyColumnNames - * The output key columns names - * @param outputValueColumnNames - * The output value columns names - * @param tag - * The tag for this reducesink - * @param partitionCols - * The columns for partitioning. - * @param numReducers - * The number of reducers, set to -1 for automatic inference based on - * input data size. - * @return The reduceSinkDesc object. - */ - public static ReduceSinkDesc getReduceSinkDesc( - final ArrayList keyCols, int numKeys, - ArrayList valueCols, - List> distinctColIndices, - List outputKeyColumnNames, - List outputValueColumnNames, - boolean includeKeyCols, int tag, - ArrayList partitionCols, String order, int numReducers) { - TableDesc keyTable = null; - TableDesc valueTable = null; - ArrayList outputKeyCols = new ArrayList(); - ArrayList outputValCols = new ArrayList(); - if (includeKeyCols) { - keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnListWithLength( - keyCols, distinctColIndices, outputKeyColumnNames, numKeys, ""), - order); - outputKeyCols.addAll(outputKeyColumnNames); - } else { - keyTable = getReduceKeyTableDesc(getFieldSchemasFromColumnList( - keyCols, "reducesinkkey"),order); - for (int i = 0; i < keyCols.size(); i++) { - outputKeyCols.add("reducesinkkey" + i); - } - } - valueTable = getReduceValueTableDesc(getFieldSchemasFromColumnList( - valueCols, outputValueColumnNames, 0, "")); - outputValCols.addAll(outputValueColumnNames); - return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, - distinctColIndices, outputValCols, - tag, partitionCols, numReducers, keyTable, - valueTable); - } - - /** - * Create the reduce sink descriptor. - * - * @param keyCols - * The columns to be stored in the key - * @param valueCols - * The columns to be stored in the value - * @param outputColumnNames - * The output columns names - * @param tag - * The tag for this reducesink - * @param numPartitionFields - * The first numPartitionFields of keyCols will be partition columns. - * If numPartitionFields=-1, then partition randomly. - * @param numReducers - * The number of reducers, set to -1 for automatic inference based on - * input data size. - * @return The reduceSinkDesc object. - */ - public static ReduceSinkDesc getReduceSinkDesc( - ArrayList keyCols, ArrayList valueCols, - List outputColumnNames, boolean includeKey, int tag, - int numPartitionFields, int numReducers) throws SemanticException { - return getReduceSinkDesc(keyCols, keyCols.size(), valueCols, - new ArrayList>(), - includeKey ? outputColumnNames.subList(0, keyCols.size()) : - new ArrayList(), - includeKey ? - outputColumnNames.subList(keyCols.size(), outputColumnNames.size()) - : outputColumnNames, - includeKey, tag, numPartitionFields, numReducers); - } - - /** - * Create the reduce sink descriptor. - * - * @param keyCols - * The columns to be stored in the key - * @param numKeys number of distribution keys. Equals to group-by-key - * numbers usually. - * @param valueCols - * The columns to be stored in the value - * @param distinctColIndices - * column indices for distinct aggregates - * @param outputKeyColumnNames - * The output key columns names - * @param outputValueColumnNames - * The output value columns names - * @param tag - * The tag for this reducesink - * @param numPartitionFields - * The first numPartitionFields of keyCols will be partition columns. - * If numPartitionFields=-1, then partition randomly. - * @param numReducers - * The number of reducers, set to -1 for automatic inference based on - * input data size. - * @return The reduceSinkDesc object. - */ - public static ReduceSinkDesc getReduceSinkDesc( - ArrayList keyCols, int numKeys, - ArrayList valueCols, - List> distinctColIndices, - List outputKeyColumnNames, List outputValueColumnNames, - boolean includeKey, int tag, - int numPartitionFields, int numReducers) throws SemanticException { - ArrayList partitionCols = null; - - if (numPartitionFields >= keyCols.size()) { - partitionCols = keyCols; - } else if (numPartitionFields >= 0) { - partitionCols = new ArrayList(numPartitionFields); - for (int i = 0; i < numPartitionFields; i++) { - partitionCols.add(keyCols.get(i)); - } - } else { - // numPartitionFields = -1 means random partitioning - partitionCols = new ArrayList(1); - partitionCols.add(TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc("rand")); - } - - StringBuilder order = new StringBuilder(); - for (int i = 0; i < keyCols.size(); i++) { - order.append("+"); - } - return getReduceSinkDesc(keyCols, numKeys, valueCols, distinctColIndices, - outputKeyColumnNames, outputValueColumnNames, includeKey, tag, - partitionCols, order.toString(), numReducers); - } - - /** * Loads the storage handler (if one exists) for the given table * and invokes {@link HiveStorageHandler#configureTableJobProperties}. * Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableDescFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDescFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDescFactory.java (revision 0) @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextInputFormat; + +public class TableDescFactory { + /** + * Generate the table descriptor of MetadataTypedColumnsetSerDe with the + * separatorCode and column names (comma separated string). + */ + public static TableDesc getDefaultTableDesc(String separatorCode, + String columns) { + return getDefaultTableDesc(separatorCode, columns, false); + } + + /** + * Generate the table descriptor of given serde with the separatorCode and + * column names (comma separated string). + */ + public static TableDesc getTableDesc( + Class serdeClass, String separatorCode, + String columns) { + return getTableDesc(serdeClass, separatorCode, columns, false); + } + + /** + * Generate the table descriptor of MetadataTypedColumnsetSerDe with the + * separatorCode and column names (comma separated string), and whether the + * last column should take the rest of the line. + */ + public static TableDesc getDefaultTableDesc(String separatorCode, + String columns, boolean lastColumnTakesRestOfTheLine) { + return getDefaultTableDesc(separatorCode, columns, null, + lastColumnTakesRestOfTheLine); + } + + /** + * Generate the table descriptor of the serde specified with the separatorCode + * and column names (comma separated string), and whether the last column + * should take the rest of the line. + */ + public static TableDesc getTableDesc( + Class serdeClass, String separatorCode, + String columns, boolean lastColumnTakesRestOfTheLine) { + return getTableDesc(serdeClass, separatorCode, columns, null, + lastColumnTakesRestOfTheLine); + } + + /** + * Generate the table descriptor of MetadataTypedColumnsetSerDe with the + * separatorCode and column names (comma separated string), and whether the + * last column should take the rest of the line. + */ + public static TableDesc getDefaultTableDesc(String separatorCode, + String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine) { + return getTableDesc(LazySimpleSerDe.class, separatorCode, columns, + columnTypes, lastColumnTakesRestOfTheLine); + } + + public static TableDesc getTableDesc( + Class serdeClass, String separatorCode, + String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine) { + return getTableDesc(serdeClass, separatorCode, columns, columnTypes, + lastColumnTakesRestOfTheLine, false); + } + + public static TableDesc getTableDesc( + Class serdeClass, String separatorCode, + String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine, + boolean useDelimitedJSON) { + + return getTableDesc(serdeClass, separatorCode, columns, columnTypes, + lastColumnTakesRestOfTheLine, useDelimitedJSON, "TextFile"); + } + + public static TableDesc getTableDesc( + Class serdeClass, String separatorCode, + String columns, String columnTypes, boolean lastColumnTakesRestOfTheLine, + boolean useDelimitedJSON, String fileFormat) { + + Properties properties = Utilities.makeProperties( + Constants.SERIALIZATION_FORMAT, separatorCode, Constants.LIST_COLUMNS, + columns); + + if (!separatorCode.equals(Integer.toString(Utilities.ctrlaCode))) { + properties.setProperty(Constants.FIELD_DELIM, separatorCode); + } + + if (columnTypes != null) { + properties.setProperty(Constants.LIST_COLUMN_TYPES, columnTypes); + } + + if (lastColumnTakesRestOfTheLine) { + properties.setProperty(Constants.SERIALIZATION_LAST_COLUMN_TAKES_REST, + "true"); + } + + // It is not a very clean way, and should be modified later - due to + // compatiblity reasons, + // user sees the results as json for custom scripts and has no way for + // specifying that. + // Right now, it is hard-coded in the code + if (useDelimitedJSON) { + serdeClass = DelimitedJSONSerDe.class; + } + + Class inputFormat, outputFormat; + // get the input & output file formats + if ("SequenceFile".equalsIgnoreCase(fileFormat)) { + inputFormat = SequenceFileInputFormat.class; + outputFormat = SequenceFileOutputFormat.class; + } else if ("RCFile".equalsIgnoreCase(fileFormat)) { + inputFormat = RCFileInputFormat.class; + outputFormat = RCFileOutputFormat.class; + assert serdeClass == ColumnarSerDe.class; + } else { // use TextFile by default + inputFormat = TextInputFormat.class; + outputFormat = IgnoreKeyTextOutputFormat.class; + } + return new TableDesc(serdeClass, inputFormat, outputFormat, properties); + } + + public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes, + String fileFormat) { + return getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes, + false, false, fileFormat); + } + + /** + * Generate a table descriptor from a createTableDesc. + */ + public static TableDesc getTableDesc(CreateTableDesc crtTblDesc, String cols, + String colTypes) { + + Class serdeClass = LazySimpleSerDe.class; + String separatorCode = Integer.toString(Utilities.ctrlaCode); + String columns = cols; + String columnTypes = colTypes; + boolean lastColumnTakesRestOfTheLine = false; + TableDesc ret; + + try { + if (crtTblDesc.getSerName() != null) { + Class c = Class.forName(crtTblDesc.getSerName()); + serdeClass = c; + } + + if (crtTblDesc.getFieldDelim() != null) { + separatorCode = crtTblDesc.getFieldDelim(); + } + + ret = getTableDesc(serdeClass, separatorCode, columns, columnTypes, + lastColumnTakesRestOfTheLine, false); + + // set other table properties + Properties properties = ret.getProperties(); + + if (crtTblDesc.getCollItemDelim() != null) { + properties.setProperty(Constants.COLLECTION_DELIM, crtTblDesc + .getCollItemDelim()); + } + + if (crtTblDesc.getMapKeyDelim() != null) { + properties.setProperty(Constants.MAPKEY_DELIM, crtTblDesc + .getMapKeyDelim()); + } + + if (crtTblDesc.getFieldEscape() != null) { + properties.setProperty(Constants.ESCAPE_CHAR, crtTblDesc + .getFieldEscape()); + } + + if (crtTblDesc.getLineDelim() != null) { + properties.setProperty(Constants.LINE_DELIM, crtTblDesc.getLineDelim()); + } + + // replace the default input & output file format with those found in + // crtTblDesc + Class c1 = Class.forName(crtTblDesc.getInputFormat()); + Class c2 = Class.forName(crtTblDesc.getOutputFormat()); + Class in_class = c1; + Class out_class = c2; + + ret.setInputFileFormatClass(in_class); + ret.setOutputFileFormatClass(out_class); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + return ret; + } + + /** + * Generate the table descriptor of MetadataTypedColumnsetSerDe with the + * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe + * does not support a table with a single column "col" with type + * "array". + */ + public static TableDesc getDefaultTableDesc(String separatorCode) { + return new TableDesc(MetadataTypedColumnsetSerDe.class, + TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities + .makeProperties( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, + separatorCode)); + } + + /** + * Generate the table descriptor for Map-side join key. + */ + public static TableDesc getMapJoinKeyTableDesc(List fieldSchemas) { + return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties("columns", + MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), + "columns.types", MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + Constants.ESCAPE_CHAR, "\\")); + } + + /** + * Generate the table descriptor for Map-side join key. + */ + public static TableDesc getMapJoinValueTableDesc( + List fieldSchemas) { + return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties("columns", + MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas), + "columns.types", MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + Constants.ESCAPE_CHAR, "\\")); + } + + /** + * Generate the table descriptor for intermediate files. + */ + public static TableDesc getIntermediateFileTableDesc( + List fieldSchemas) { + return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties( + Constants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + Constants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + Constants.ESCAPE_CHAR, "\\")); + } + + /** + * Generate the table descriptor for intermediate files. + */ + public static TableDesc getReduceValueTableDesc(List fieldSchemas) { + return new TableDesc(LazyBinarySerDe.class, SequenceFileInputFormat.class, + SequenceFileOutputFormat.class, Utilities.makeProperties( + Constants.LIST_COLUMNS, MetaStoreUtils + .getColumnNamesFromFieldSchema(fieldSchemas), + Constants.LIST_COLUMN_TYPES, MetaStoreUtils + .getColumnTypesFromFieldSchema(fieldSchemas), + Constants.ESCAPE_CHAR, "\\")); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 10406) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -82,12 +82,12 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; @@ -137,10 +137,13 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceKeyTableDescFactory; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDescFactory; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableDescFactory; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; @@ -1440,7 +1443,7 @@ throw new SemanticException(e); } - TableDesc tblDesc = PlanUtils.getTableDesc(serdeClass, Integer + TableDesc tblDesc = TableDescFactory.getTableDesc(serdeClass, Integer .toString(Utilities.tabCode), cols, colTypes, defaultCols); // copy all the properties if (child.getChildCount() == 2) { @@ -1455,7 +1458,7 @@ } return tblDesc; } else if (child.getType() == HiveParser.TOK_SERDEPROPS) { - TableDesc tblDesc = PlanUtils.getDefaultTableDesc(Integer + TableDesc tblDesc = TableDescFactory.getDefaultTableDesc(Integer .toString(Utilities.ctrlaCode), cols, colTypes, defaultCols); int numChildRowFormat = child.getChildCount(); for (int numC = 0; numC < numChildRowFormat; numC++) { @@ -1630,7 +1633,7 @@ .getChild(inputSerDeNum))).getChild(0), inpColumns.toString(), inpColumnTypes.toString(), false); } else { - inInfo = PlanUtils.getTableDesc(serde, Integer + inInfo = TableDescFactory.getTableDesc(serde, Integer .toString(Utilities.tabCode), inpColumns.toString(), inpColumnTypes .toString(), false, true); } @@ -1644,13 +1647,13 @@ // However, if the script outputs: col1, col2, col3 seperated by TAB, the // requirement is: key is col and value is (col2 TAB col3) } else { - outInfo = PlanUtils.getTableDesc(serde, Integer + outInfo = TableDescFactory.getTableDesc(serde, Integer .toString(Utilities.tabCode), columns.toString(), columnTypes .toString(), defaultOutputCols); } // Error stream always uses the default serde with a single column - errInfo = PlanUtils.getTableDesc(serde, Integer.toString(Utilities.tabCode), "KEY"); + errInfo = TableDescFactory.getTableDesc(serde, Integer.toString(Utilities.tabCode), "KEY"); // Output record readers Class outRecordReader = getRecordReader((ASTNode) trfm @@ -2683,7 +2686,7 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, + OperatorFactory.getAndMakeChild(ReduceSinkDescFactory.getReduceSinkDesc(reduceKeys, grpByExprs.size(), reduceValues, distinctColIndices, outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, numReducers), new RowSchema(reduceSinkOutputRowResolver @@ -2755,7 +2758,7 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, + OperatorFactory.getAndMakeChild(ReduceSinkDescFactory.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, true, -1, numPartitionFields, numReducers), new RowSchema(reduceSinkOutputRowResolver2 .getColumnInfos()), groupByOperatorInfo), @@ -3685,13 +3688,13 @@ if (tblDesc == null) { if (qb.getIsQuery()) { String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat); + table_desc = TableDescFactory.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat); } else { - table_desc = PlanUtils.getDefaultTableDesc(Integer + table_desc = TableDescFactory.getDefaultTableDesc(Integer .toString(Utilities.ctrlaCode), cols, colTypes, false); } } else { - table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes); + table_desc = TableDescFactory.getTableDesc(tblDesc, cols, colTypes); } if (!outputs.add(new WriteEntity(destStr, !isDfsDir))) { @@ -4146,7 +4149,7 @@ order.append("+"); } - Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils + Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(ReduceSinkDescFactory .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1, partitionCols, order.toString(), numReducers), new RowSchema(inputRR.getColumnInfos()), input), inputRR); @@ -4259,7 +4262,7 @@ for (int i = 0; i < valueCols.size(); i++) { outputColumns.add(getColumnInternalName(i)); } - Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils + Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(ReduceSinkDescFactory .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1, partitionCols, order.toString(), numReducers), new RowSchema(inputRR.getColumnInfos()), input), inputRR); @@ -4433,7 +4436,7 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, + OperatorFactory.getAndMakeChild(ReduceSinkDescFactory.getReduceSinkDesc(reduceKeys, reduceValues, outputColumns, false, joinTree.getNextTag(), reduceKeys.size(), numReds), new RowSchema(outputRS .getColumnInfos()), child), outputRS); @@ -4632,9 +4635,8 @@ Operator oi = (i == 0 && right[i] == null ? left : right[i]); ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf(); - now.setKeySerializeInfo(PlanUtils.getReduceKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"), now - .getOrder())); + now.setKeySerializeInfo(ReduceKeyTableDescFactory.getReduceKeyTableDesc(now.getKeyCols(), + "joinkey", now.getOrder())); } } @@ -5286,7 +5288,7 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, + OperatorFactory.getAndMakeChild(ReduceSinkDescFactory.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1), new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input), reduceSinkOutputRowResolver); @@ -6190,7 +6192,7 @@ String colTypes = loadFileWork.get(0).getColumnTypes(); String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + TableDesc resultTab = TableDescFactory.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), resultTab, qb.getParseInfo().getOuterQueryLimit());