diff --git hbase-handler/pom.xml hbase-handler/pom.xml index 132af43..ad7c52a 100644 --- hbase-handler/pom.xml +++ hbase-handler/pom.xml @@ -222,6 +222,19 @@ ${basedir}/src/java ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java new file mode 100644 index 0000000..18fb5ea --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.Properties; + +public abstract class AbstractHBaseKeyFactory implements HBaseKeyFactory { + + protected HBaseSerDeParameters hbaseParams; + protected ColumnMappings.ColumnMapping keyMapping; + protected Properties properties; + + @Override + public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { + this.hbaseParams = hbaseParam; + this.keyMapping = hbaseParam.getKeyColumnMapping(); + this.properties = properties; + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException { + TableMapReduceUtil.addDependencyJars(jobConf, getClass()); + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate); + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java new file mode 100644 index 0000000..9cae5d3 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This source file is based on code taken from SQLLine 1.0.2 + * See SQLLine notice in LICENSE + */ + +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.Iterators; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class ColumnMappings implements Iterable { + + private final int keyIndex; + private final ColumnMapping[] columnsMapping; + + public ColumnMappings(List columnMapping, int keyIndex) { + this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]); + this.keyIndex = keyIndex; + } + + @Override + public Iterator iterator() { + return Iterators.forArray(columnsMapping); + } + + public int size() { + return columnsMapping.length; + } + + String toTypesString() { + StringBuilder sb = new StringBuilder(); + for (ColumnMapping colMap : columnsMapping) { + if (sb.length() > 0) { + sb.append(":"); + } + if (colMap.hbaseRowKey) { + // the row key column becomes a STRING + sb.append(serdeConstants.STRING_TYPE_NAME); + } else if (colMap.qualifierName == null) { + // a column family become a MAP + sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," + + serdeConstants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + sb.append(serdeConstants.STRING_TYPE_NAME); + } + } + return sb.toString(); + } + + void setHiveColumnDescription(String serdeName, + List columnNames, List columnTypes) throws SerDeException { + if (columnsMapping.length != columnNames.size()) { + throw new SerDeException(serdeName + ": columns has " + columnNames.size() + + " elements while hbase.columns.mapping has " + columnsMapping.length + " elements" + + " (counting the key if implicit)"); + } + + // check that the mapping schema is right; + // check that the "column-family:" is mapped to Map + // where key extends LazyPrimitive and thus has type Category.PRIMITIVE + for (int i = 0; i < columnNames.size(); i++) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.qualifierName == null && !colMap.hbaseRowKey) { + TypeInfo typeInfo = columnTypes.get(i); + if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory() + != ObjectInspector.Category.PRIMITIVE)) { + + throw new SerDeException( + serdeName + ": hbase column family '" + colMap.familyName + + "' should be mapped to Map,?>, that is " + + "the Key for the map should be of primitive type, but is mapped to " + + typeInfo.getTypeName()); + } + } + colMap.columnName = columnNames.get(i); + colMap.columnType = columnTypes.get(i); + } + } + + /** + * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying + * whether to use a binary or an UTF string format to serialize and de-serialize primitive + * data types like boolean, byte, short, int, long, float, and double. This applies to + * regular columns and also to map column types which are associated with an HBase column + * family. For the map types, we apply the specification to the key or the value provided it + * is one of the above primitive types. The specifier is a colon separated value of the form + * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string + * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the + * table level default. + * + * @param hbaseTableDefaultStorageType - the specification associated with the table property + * hbase.table.default.storage.type + * @throws SerDeException on parse error. + */ + void parseColumnStorageTypes(String hbaseTableDefaultStorageType) throws SerDeException { + + boolean tableBinaryStorage = false; + + if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) { + if (hbaseTableDefaultStorageType.equals("binary")) { + tableBinaryStorage = true; + } else if (!hbaseTableDefaultStorageType.equals("string")) { + throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE + + " parameter must be specified as" + + " 'string' or 'binary'; '" + hbaseTableDefaultStorageType + + "' is not a valid specification for this table/serde property."); + } + } + + // parse the string to determine column level storage type for primitive types + // 's' is for variable length string format storage + // 'b' is for fixed width binary storage of bytes + // '-' is for table storage type, which defaults to UTF8 string + // string data is always stored in the default escaped storage format; the data types + // byte, short, int, long, float, and double have a binary byte oriented storage option + for (ColumnMapping colMap : columnsMapping) { + TypeInfo colType = colMap.columnType; + String mappingSpec = colMap.mappingSpec; + String[] mapInfo = mappingSpec.split("#"); + String[] storageInfo = null; + + if (mapInfo.length == 2) { + storageInfo = mapInfo[1].split(":"); + } + + if (storageInfo == null) { + + // use the table default storage specification + if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE) { + if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + colMap.binaryStorage.add(tableBinaryStorage); + } else { + colMap.binaryStorage.add(false); + } + } else if (colType.getCategory() == ObjectInspector.Category.MAP) { + TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo(); + TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo(); + + if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + colMap.binaryStorage.add(tableBinaryStorage); + } else { + colMap.binaryStorage.add(false); + } + + if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + colMap.binaryStorage.add(tableBinaryStorage); + } else { + colMap.binaryStorage.add(false); + } + } else { + colMap.binaryStorage.add(false); + } + + } else if (storageInfo.length == 1) { + // we have a storage specification for a primitive column type + String storageOption = storageInfo[0]; + + if ((colType.getCategory() == ObjectInspector.Category.MAP) || + !(storageOption.equals("-") || "string".startsWith(storageOption) || + "binary".startsWith(storageOption))) { + throw new SerDeException("Error: A column storage specification is one of the following:" + + " '-', a prefix of 'string', or a prefix of 'binary'. " + + storageOption + " is not a valid storage option specification for " + + colMap.columnName); + } + + if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE && + !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + + if ("-".equals(storageOption)) { + colMap.binaryStorage.add(tableBinaryStorage); + } else if ("binary".startsWith(storageOption)) { + colMap.binaryStorage.add(true); + } else { + colMap.binaryStorage.add(false); + } + } else { + colMap.binaryStorage.add(false); + } + + } else if (storageInfo.length == 2) { + // we have a storage specification for a map column type + + String keyStorage = storageInfo[0]; + String valStorage = storageInfo[1]; + + if ((colType.getCategory() != ObjectInspector.Category.MAP) || + !(keyStorage.equals("-") || "string".startsWith(keyStorage) || + "binary".startsWith(keyStorage)) || + !(valStorage.equals("-") || "string".startsWith(valStorage) || + "binary".startsWith(valStorage))) { + throw new SerDeException("Error: To specify a valid column storage type for a Map" + + " column, use any two specifiers from '-', a prefix of 'string', " + + " and a prefix of 'binary' separated by a ':'." + + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the" + + " key and value parts of the Map respectively." + + " Invalid storage specification for column " + + colMap.columnName + + "; " + storageInfo[0] + ":" + storageInfo[1]); + } + + TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo(); + TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo(); + + if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + + if (keyStorage.equals("-")) { + colMap.binaryStorage.add(tableBinaryStorage); + } else if ("binary".startsWith(keyStorage)) { + colMap.binaryStorage.add(true); + } else { + colMap.binaryStorage.add(false); + } + } else { + colMap.binaryStorage.add(false); + } + + if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { + if (valStorage.equals("-")) { + colMap.binaryStorage.add(tableBinaryStorage); + } else if ("binary".startsWith(valStorage)) { + colMap.binaryStorage.add(true); + } else { + colMap.binaryStorage.add(false); + } + } else { + colMap.binaryStorage.add(false); + } + + if (colMap.binaryStorage.size() != 2) { + throw new SerDeException("Error: In parsing the storage specification for column " + + colMap.columnName); + } + + } else { + // error in storage specification + throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification " + + mappingSpec + " is not valid for column: " + + colMap.columnName); + } + } + } + + public ColumnMapping getKeyMapping() { + return columnsMapping[keyIndex]; + } + + public int getKeyIndex() { + return keyIndex; + } + + public ColumnMapping[] getColumnsMapping() { + return columnsMapping; + } + + // todo use final fields + static class ColumnMapping { + + ColumnMapping() { + binaryStorage = new ArrayList(2); + } + + String columnName; + TypeInfo columnType; + + String familyName; + String qualifierName; + byte[] familyNameBytes; + byte[] qualifierNameBytes; + List binaryStorage; + boolean hbaseRowKey; + String mappingSpec; + String qualifierPrefix; + byte[] qualifierPrefixBytes; + + public boolean isCategory(ObjectInspector.Category category) { + return columnType.getCategory() == category; + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java new file mode 100644 index 0000000..a04bd1b --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class CompositeHBaseKeyFactory + extends DefaultHBaseKeyFactory implements Configurable { + + public static final Log LOG = LogFactory.getLog(CompositeHBaseKeyFactory.class); + + private final Class keyClass; + private final Constructor constructor; + + private Configuration conf; + + public CompositeHBaseKeyFactory(Class keyClass) throws Exception { + // see javadoc of HBaseCompositeKey + this.keyClass = keyClass; + this.constructor = keyClass.getDeclaredConstructor( + LazySimpleStructObjectInspector.class, Properties.class, Configuration.class); + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException { + super.configureJobConf(tableDesc, jobConf); + TableMapReduceUtil.addDependencyJars(jobConf, keyClass); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public T createKey(ObjectInspector inspector) throws SerDeException { + try { + return (T) constructor.newInstance(inspector, properties, conf); + } catch (Exception e) { + throw new SerDeException(e); + } + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + String keyColName = hbaseParams.getKeyColumnMapping().columnName; + + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); + analyzer.allowColumnName(keyColName); + analyzer.setAcceptsFields(true); + analyzer.setFieldValidator(new Validator()); + + DecomposedPredicate decomposed = new DecomposedPredicate(); + + List conditions = new ArrayList(); + decomposed.residualPredicate = + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions); + if (!conditions.isEmpty()) { + decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions); + try { + decomposed.pushedPredicateObject = setupFilter(keyColName, conditions); + } catch (Exception e) { + LOG.warn("Failed to decompose predicates", e); + return null; + } + } + return decomposed; + } + + /** + * Validates the field in the {@link ExprNodeFieldDesc}. Basically this validates that the given field is the first field in the given struct. + * This is important specially in case of structs as order of fields in the structs is important when using for any filter down the line + **/ + private Serializable setupFilter(String keyColName, List conditions) + throws Exception { + HBaseScanRange scanRange = new HBaseScanRange(); + for (IndexSearchCondition condition : conditions) { + if (condition.getFields() == null) { + continue; + } + String field = condition.getFields()[0]; + Object value = condition.getConstantDesc().getValue(); + scanRange.addFilter(new FamilyFilter( + CompareFilter.CompareOp.EQUAL, new BinaryComparator(field.getBytes()))); + } + return scanRange; + } + + private static class Validator implements IndexPredicateAnalyzer.FieldValidator { + + public boolean validate(ExprNodeFieldDesc fieldDesc) { + String fieldName = fieldDesc.getFieldName(); + + ExprNodeDesc nodeDesc = fieldDesc.getDesc(); + + TypeInfo typeInfo = nodeDesc.getTypeInfo(); + + if (!(typeInfo instanceof StructTypeInfo)) { + // since we are working off a ExprNodeFieldDesc which represents a field within a struct, this + // should never happen + throw new AssertionError("Expected StructTypeInfo. Found:" + typeInfo.getTypeName()); + } + + List allFieldNames = ((StructTypeInfo) typeInfo).getAllStructFieldNames(); + + if (allFieldNames == null || allFieldNames.size() == 0) { + return false; + } + + String firstElement = allFieldNames.get(0); + + return firstElement.equals(fieldName); + } + } +} \ No newline at end of file diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java new file mode 100644 index 0000000..5731e45 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; +import java.util.Properties; + +public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory { + + protected LazySimpleSerDe.SerDeParameters serdeParams; + protected HBaseRowSerializer serializer; + + @Override + public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { + super.init(hbaseParam, properties); + this.serdeParams = hbaseParam.getSerdeParams(); + this.serializer = new HBaseRowSerializer(hbaseParam); + } + + @Override + public ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException { + return LazyFactory.createLazyObjectInspector(type, serdeParams.getSeparators(), 1, + serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar()); + } + + @Override + public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException { + return LazyFactory.createLazyObject(inspector, keyMapping.binaryStorage.get(0)); + } + + @Override + public byte[] serializeKey(Object object, StructField field) throws IOException { + return serializer.serializeKeyField(object, field, keyMapping); + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java index 5008f15..d184216 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -20,9 +20,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObject; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java new file mode 100644 index 0000000..251d22e --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.Properties; + +/** + * Provides custom implementation of object and object inspector for hbase key. + * The key object should implement LazyObjectBase. + * + * User can optionally implement HiveStoragePredicateHandler for handling filter predicates + */ +public interface HBaseKeyFactory extends HiveStoragePredicateHandler { + + /** + * initialize factory with properties + */ + void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException; + + /** + * create custom object inspector for hbase key + * @param type type information + */ + ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException; + + /** + * create custom object for hbase key + * @param inspector OI create by {@link HBaseKeyFactory#createKeyObjectInspector} + */ + LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException; + + /** + * serialize hive object in internal format of custom key + * + * @param object + * @param inspector + * @param output + * + * @return true if it's not null + * @throws java.io.IOException + */ + byte[] serializeKey(Object object, StructField field) throws IOException; + + /** + * configure jobConf for this factory + * + * @param tableDesc + * @param jobConf + */ + void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException; +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java new file mode 100644 index 0000000..5c26456 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.ArrayList; +import java.util.List; + +// Does same thing with LazyFactory#createLazyObjectInspector except that this replaces +// original keyOI with OI which is create by HBaseKeyFactory provided by serde property for hbase +public class HBaseLazyObjectFactory { + + public static ObjectInspector createLazyHBaseStructInspector( + SerDeParameters serdeParams, int index, HBaseKeyFactory factory) throws SerDeException { + List columnTypes = serdeParams.getColumnTypes(); + ArrayList columnObjectInspectors = new ArrayList( + columnTypes.size()); + for (int i = 0; i < columnTypes.size(); i++) { + if (i == index) { + columnObjectInspectors.add(factory.createKeyObjectInspector(columnTypes.get(i))); + } else { + columnObjectInspectors.add(LazyFactory.createLazyObjectInspector( + columnTypes.get(i), serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), + serdeParams.isEscaped(), serdeParams.getEscapeChar())); + } + } + return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector( + serdeParams.getColumnNames(), columnObjectInspectors, serdeParams.getSeparators()[0], + serdeParams.getNullSequence(), serdeParams.isLastColumnTakesRest(), + serdeParams.isEscaped(), serdeParams.getEscapeChar()); + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java new file mode 100644 index 0000000..fe6081e --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class HBaseRowSerializer { + + private final HBaseKeyFactory keyFactory; + private final HBaseSerDeParameters hbaseParam; + private final LazySimpleSerDe.SerDeParameters serdeParam; + + private final int keyIndex; + private final ColumnMapping keyMapping; + private final ColumnMapping[] columnMappings; + private final byte[] separators; // the separators array + private final boolean escaped; // whether we need to escape the data when writing out + private final byte escapeChar; // which char to use as the escape char, e.g. '\\' + private final boolean[] needsEscape; // which chars need to be escaped. This array should have size + // of 128. Negative byte values (or byte values >= 128) are + // never escaped. + + private final long putTimestamp; + private final ByteStream.Output output = new ByteStream.Output(); + + public HBaseRowSerializer(HBaseSerDeParameters hbaseParam) { + this.hbaseParam = hbaseParam; + this.keyFactory = hbaseParam.getKeyFactory(); + this.serdeParam = hbaseParam.getSerdeParams(); + this.separators = serdeParam.getSeparators(); + this.escaped = serdeParam.isEscaped(); + this.escapeChar = serdeParam.getEscapeChar(); + this.needsEscape = serdeParam.getNeedsEscape(); + this.keyIndex = hbaseParam.getKeyIndex(); + this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping(); + this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping(); + this.putTimestamp = hbaseParam.getPutTimestamp(); + } + + public Writable serialize(Object obj, ObjectInspector objInspector) throws Exception { + if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + StructField field = fields.get(keyIndex); + Object value = list.get(keyIndex); + + byte[] key = keyFactory.serializeKey(value, field); + if (key == null) { + throw new SerDeException("HBase row key cannot be NULL"); + } + + Put put = putTimestamp >= 0 ? new Put(key, putTimestamp) : new Put(key); + + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + if (i == keyIndex) { + continue; + } + field = fields.get(i); + value = list.get(i); + serializeField(value, field, columnMappings[i], put); + } + + return new PutWritable(put); + } + + byte[] serializeKeyField(Object keyValue, StructField keyField, ColumnMapping keyMapping) + throws IOException { + if (keyValue == null) { + throw new IOException("HBase row key cannot be NULL"); + } + ObjectInspector keyFieldOI = keyField.getFieldObjectInspector(); + + if (!keyFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE) && + keyMapping.isCategory(ObjectInspector.Category.PRIMITIVE)) { + // we always serialize the String type using the escaped algorithm for LazyString + return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false); + } + // use the serialization option switch to write primitive values as either a variable + // length UTF8 string or a fixed width bytes if serializing in binary format + boolean writeBinary = keyMapping.binaryStorage.get(0); + return serialize(keyValue, keyFieldOI, 1, writeBinary); + } + + private void serializeField( + Object value, StructField field, ColumnMapping colMap, Put put) throws IOException { + if (value == null) { + // a null object, we do not serialize it + return; + } + // Get the field objectInspector and the field object. + ObjectInspector foi = field.getFieldObjectInspector(); + + // If the field corresponds to a column family in HBase + if (colMap.qualifierName == null) { + MapObjectInspector moi = (MapObjectInspector) foi; + Map map = moi.getMap(value); + if (map == null) { + return; + } + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + for (Map.Entry entry: map.entrySet()) { + // Get the Key + // Map keys are required to be primitive and may be serialized in binary format + byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0)); + if (columnQualifierBytes == null) { + continue; + } + + // Map values may be serialized in binary format when they are primitive and binary + // serialization is the option selected + byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1)); + if (bytes == null) { + continue; + } + + put.add(colMap.familyNameBytes, columnQualifierBytes, bytes); + } + } else { + byte[] bytes; + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + if (!foi.getCategory().equals(ObjectInspector.Category.PRIMITIVE) + && colMap.isCategory(ObjectInspector.Category.PRIMITIVE)) { + // we always serialize the String type using the escaped algorithm for LazyString + bytes = serialize(SerDeUtils.getJSONString(value, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false); + } else { + // use the serialization option switch to write primitive values as either a variable + // length UTF8 string or a fixed width bytes if serializing in binary format + bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0)); + } + + if (bytes == null) { + return; + } + + put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes); + } + } + + /* + * Serialize the row into a ByteStream. + * + * @param obj The object for the current field. + * @param objInspector The ObjectInspector for the current Object. + * @param level The current level of separator. + * @param writeBinary Whether to write a primitive object as an UTF8 variable length string or + * as a fixed width byte array onto the byte stream. + * @throws IOException On error in writing to the serialization stream. + * @return true On serializing a non-null object, otherwise false. + */ + private byte[] serialize(Object obj, ObjectInspector objInspector, int level, boolean writeBinary) + throws IOException { + output.reset(); + if (objInspector.getCategory() == ObjectInspector.Category.PRIMITIVE && writeBinary) { + LazyUtils.writePrimitive(output, obj, (PrimitiveObjectInspector) objInspector); + } else { + if (!serialize(obj, objInspector, level, output)) { + return null; + } + } + return output.toByteArray(); + } + + private boolean serialize( + Object obj, + ObjectInspector objInspector, + int level, ByteStream.Output ss) throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: + LazyUtils.writePrimitiveUTF8(ss, obj, + (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape); + return true; + case LIST: + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + ss.write(separator); + } + serialize(list.get(i), eoi, level + 1, ss); + } + } + return true; + case MAP: + char sep = (char) separators[level]; + char keyValueSeparator = (char) separators[level+1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + ss.write(sep); + } + serialize(entry.getKey(), koi, level+2, ss); + ss.write(keyValueSeparator); + serialize(entry.getValue(), voi, level+2, ss); + } + } + return true; + case STRUCT: + sep = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + ss.write(sep); + } + + serialize(list.get(i), fields.get(i).getFieldObjectInspector(), + level + 1, ss); + } + } + return true; + default: + throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java new file mode 100644 index 0000000..8b64321 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.io.BytesWritable; + +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +public class HBaseScanRange implements Serializable { + + private byte[] startRow; + private byte[] stopRow; + + private List filterDescs = new ArrayList(); + + public byte[] getStartRow() { + return startRow; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } + + public void addFilter(Filter filter) throws Exception { + Class clazz = filter.getClass(); + clazz.getMethod("parseFrom", byte[].class); // valiade + filterDescs.add(new FilterDesc(clazz.getName(), filter.toByteArray())); + } + + public void setup(Scan scan, Configuration conf) throws Exception { + if (startRow != null) { + scan.setStartRow(startRow); + } + if (stopRow != null) { + scan.setStopRow(stopRow); + } + if (filterDescs.isEmpty()) { + return; + } + if (filterDescs.size() == 1) { + scan.setFilter(filterDescs.get(0).toFilter(conf)); + return; + } + List filters = new ArrayList(); + for (FilterDesc filter : filterDescs) { + filters.add(filter.toFilter(conf)); + } + scan.setFilter(new FilterList(filters)); + } + + public String toString() { + return (startRow == null ? "" : new BytesWritable(startRow).toString()) + " ~ " + + (stopRow == null ? "" : new BytesWritable(stopRow).toString()); + } + + private static class FilterDesc implements Serializable { + + private String className; + private byte[] binary; + + public FilterDesc(String className, byte[] binary) { + this.className = className; + this.binary = binary; + } + + public Filter toFilter(Configuration conf) throws Exception { + return (Filter) getFactoryMethod(className, conf).invoke(null, binary); + } + + private Method getFactoryMethod(String className, Configuration conf) throws Exception { + Class clazz = conf.getClassByName(className); + return clazz.getMethod("parseFrom", byte[].class); + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 5fe35a5..6c1ce5c 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -18,36 +18,24 @@ package org.apache.hadoop.hive.hbase; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.AbstractSerDe; -import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.lazy.LazyFactory; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; /** * HBaseSerDe can be used to serialize object into an HBase table and @@ -62,6 +50,7 @@ public static final String HBASE_KEY_COL = ":key"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; + public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory"; public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; @@ -73,19 +62,13 @@ private ObjectInspector cachedObjectInspector; private LazyHBaseRow cachedHBaseRow; - private Object compositeKeyObj; private HBaseSerDeParameters serdeParams; + private HBaseRowSerializer serializer; @Override public String toString() { - return getClass().toString() - + "[" - + serdeParams.getColumnMappingString() - + ":" - + serdeParams.getRowTypeInfo().getAllStructFieldNames() - + ":" - + serdeParams.getRowTypeInfo().getAllStructFieldTypeInfos() + "]"; + return getClass() + "[" + serdeParams + "]"; } public HBaseSerDe() throws SerDeException { @@ -98,35 +81,26 @@ public HBaseSerDe() throws SerDeException { @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { - serdeParams = new HBaseSerDeParameters(); - serdeParams.init(conf, tbl, getClass().getName()); - - cachedObjectInspector = LazyFactory.createLazyStructInspector( - serdeParams.getColumnNames(), - serdeParams.getColumnTypes(), - serdeParams.getSeparators(), - serdeParams.getNullSequence(), - serdeParams.isLastColumnTakesRest(), - serdeParams.isEscaped(), - serdeParams.getEscapeChar()); - - cachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector) cachedObjectInspector); - - if (serdeParams.getCompositeKeyClass() != null) { - // initialize the constructor of the composite key class with its object inspector - initCompositeKeyClass(conf,tbl); - } + serdeParams = new HBaseSerDeParameters(conf, tbl, getClass().getName()); + + cachedObjectInspector = HBaseLazyObjectFactory.createLazyHBaseStructInspector( + serdeParams.getSerdeParams(), serdeParams.getKeyIndex(), serdeParams.getKeyFactory()); + + cachedHBaseRow = new LazyHBaseRow( + (LazySimpleStructObjectInspector) cachedObjectInspector, + serdeParams.getKeyIndex(), serdeParams.getKeyFactory()); + + serializer = new HBaseRowSerializer(serdeParams); if (LOG.isDebugEnabled()) { - LOG.debug("HBaseSerDe initialized with : columnNames = " - + serdeParams.getColumnNames() - + " columnTypes = " - + serdeParams.getColumnTypes() - + " hbaseColumnMapping = " - + serdeParams.getColumnMappingString()); + LOG.debug("HBaseSerDe initialized with : " + serdeParams); } } + public static ColumnMappings parseColumnsMapping(String columnsMappingSpec) + throws SerDeException { + return parseColumnsMapping(columnsMappingSpec, true); + } /** * Parses the HBase columns mapping specifier to identify the column families, qualifiers * and also caches the byte arrays corresponding to them. One of the Hive table @@ -135,24 +109,23 @@ public void initialize(Configuration conf, Properties tbl) * @param columnsMappingSpec string hbase.columns.mapping specified when creating table * @param doColumnRegexMatching whether to do a regex matching on the columns or not * @return List which contains the column mapping information by position - * @throws SerDeException + * @throws org.apache.hadoop.hive.serde2.SerDeException */ - public static List parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching) - throws SerDeException { + public static ColumnMappings parseColumnsMapping( + String columnsMappingSpec, boolean doColumnRegexMatching) throws SerDeException { if (columnsMappingSpec == null) { throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table."); } - if (columnsMappingSpec.equals("") || columnsMappingSpec.equals(HBASE_KEY_COL)) { + if (columnsMappingSpec.isEmpty() || columnsMappingSpec.equals(HBASE_KEY_COL)) { throw new SerDeException("Error: hbase.columns.mapping specifies only the HBase table" + " row key. A valid Hive-HBase table must specify at least one additional column."); } int rowKeyIndex = -1; List columnsMapping = new ArrayList(); - String [] columnSpecs = columnsMappingSpec.split(","); - ColumnMapping columnMapping = null; + String[] columnSpecs = columnsMappingSpec.split(","); for (int i = 0; i < columnSpecs.length; i++) { String mappingSpec = columnSpecs[i].trim(); @@ -167,7 +140,7 @@ public void initialize(Configuration conf, Properties tbl) "column family, column qualifier specification."); } - columnMapping = new ColumnMapping(); + ColumnMapping columnMapping = new ColumnMapping(); if (colInfo.equals(HBASE_KEY_COL)) { rowKeyIndex = i; @@ -197,7 +170,6 @@ public void initialize(Configuration conf, Properties tbl) // set the regular provided qualifier names columnMapping.qualifierName = parts[1]; columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); - ; } } else { columnMapping.qualifierName = null; @@ -211,34 +183,26 @@ public void initialize(Configuration conf, Properties tbl) } if (rowKeyIndex == -1) { - columnMapping = new ColumnMapping(); - columnMapping.familyName = HBASE_KEY_COL; - columnMapping.familyNameBytes = Bytes.toBytes(HBASE_KEY_COL); + rowKeyIndex = 0; + ColumnMapping columnMapping = new ColumnMapping(); + columnMapping.familyName = HBaseSerDe.HBASE_KEY_COL; + columnMapping.familyNameBytes = Bytes.toBytes(HBaseSerDe.HBASE_KEY_COL); columnMapping.qualifierName = null; columnMapping.qualifierNameBytes = null; columnMapping.hbaseRowKey = true; - columnMapping.mappingSpec = HBASE_KEY_COL; + columnMapping.mappingSpec = HBaseSerDe.HBASE_KEY_COL; columnsMapping.add(0, columnMapping); } - return columnsMapping; + return new ColumnMappings(columnsMapping, rowKeyIndex); } - static class ColumnMapping { - - ColumnMapping() { - binaryStorage = new ArrayList(2); - } + public LazySimpleSerDe.SerDeParameters getSerdeParams() { + return serdeParams.getSerdeParams(); + } - String familyName; - String qualifierName; - byte [] familyNameBytes; - byte [] qualifierNameBytes; - List binaryStorage; - boolean hbaseRowKey; - String mappingSpec; - String qualifierPrefix; - byte[] qualifierPrefixBytes; + public HBaseSerDeParameters getHBaseSerdeParam() { + return serdeParams; } /** @@ -253,8 +217,8 @@ public Object deserialize(Writable result) throws SerDeException { throw new SerDeException(getClass().getName() + ": expects ResultWritable!"); } - cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMapping(), - compositeKeyObj); + cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMappings()); + return cachedHBaseRow; } @@ -269,310 +233,14 @@ public ObjectInspector getObjectInspector() throws SerDeException { } @Override - public Writable serialize(Object obj, ObjectInspector objInspector) - throws SerDeException { - if (objInspector.getCategory() != Category.STRUCT) { - throw new SerDeException(getClass().toString() - + " can only serialize struct types, but we got: " - + objInspector.getTypeName()); - } - - // Prepare the field ObjectInspectors - StructObjectInspector soi = (StructObjectInspector) objInspector; - List fields = soi.getAllStructFieldRefs(); - List list = soi.getStructFieldsDataAsList(obj); - List declaredFields = - ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs(); - - int iKey = serdeParams.getKeyIndex(); - StructField field = fields.get(iKey); - Object value = list.get(iKey); - StructField declaredField = declaredFields.get(iKey); - byte[] key; + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { try { - key = serializeKeyField(field, value, declaredField, serdeParams); - } catch (IOException ex) { - throw new SerDeException(ex); - } - - if (key == null) { - throw new SerDeException("HBase row key cannot be NULL"); - } - - Put put = null; - long putTimestamp = serdeParams.getPutTimestamp(); - if(putTimestamp >= 0) { - put = new Put(key, putTimestamp); - } else { - put = new Put(key); - } - - try { - // Serialize each field - for (int i = 0; i < fields.size(); i++) { - if (i == iKey) { - continue; - } - - field = fields.get(i); - value = list.get(i); - declaredField = declaredFields.get(i); - ColumnMapping colMap = serdeParams.getColumnMapping().get(i); - serializeField(put, field, value, declaredField, colMap); - } - } catch (IOException e) { + return serializer.serialize(obj, objInspector); + } catch (SerDeException e) { + throw e; + } catch (Exception e) { throw new SerDeException(e); } - - return new PutWritable(put); - } - - private static byte[] serializeKeyField(StructField keyField, Object keyValue, - StructField declaredKeyField, HBaseSerDeParameters serdeParams) throws IOException { - if (keyValue == null) { - // a null object, we do not serialize it - return null; - } - - boolean writeBinary = serdeParams.getKeyColumnMapping().binaryStorage.get(0); - ObjectInspector keyFieldOI = keyField.getFieldObjectInspector(); - - if (!keyFieldOI.getCategory().equals(Category.PRIMITIVE) && - declaredKeyField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) { - // we always serialize the String type using the escaped algorithm for LazyString - return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI), - PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams); - } else { - // use the serialization option switch to write primitive values as either a variable - // length UTF8 string or a fixed width bytes if serializing in binary format - return serialize(keyValue, keyFieldOI, 1, writeBinary, serdeParams); - } - - } - - private void serializeField(Put put, StructField field, Object value, - StructField declaredField, ColumnMapping colMap) throws IOException { - if (value == null) { - // a null object, we do not serialize it - return; - } - - // Get the field objectInspector and the field object. - ObjectInspector foi = field.getFieldObjectInspector(); - - // If the field corresponds to a column family in HBase - if (colMap.qualifierName == null) { - MapObjectInspector moi = (MapObjectInspector) foi; - ObjectInspector koi = moi.getMapKeyObjectInspector(); - ObjectInspector voi = moi.getMapValueObjectInspector(); - - Map map = moi.getMap(value); - if (map == null) { - return; - } else { - for (Map.Entry entry: map.entrySet()) { - // Get the Key - // Map keys are required to be primitive and may be serialized in binary format - byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0), serdeParams); - if (columnQualifierBytes == null) { - continue; - } - - // Map values may be serialized in binary format when they are primitive and binary - // serialization is the option selected - byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1), serdeParams); - if (bytes == null) { - continue; - } - - put.add(colMap.familyNameBytes, columnQualifierBytes, bytes); - } - } - } else { - byte[] bytes = null; - // If the field that is passed in is NOT a primitive, and either the - // field is not declared (no schema was given at initialization), or - // the field is declared as a primitive in initialization, serialize - // the data to JSON string. Otherwise serialize the data in the - // delimited way. - if (!foi.getCategory().equals(Category.PRIMITIVE) - && declaredField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) { - // we always serialize the String type using the escaped algorithm for LazyString - bytes = serialize(SerDeUtils.getJSONString(value, foi), - PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams); - } else { - // use the serialization option switch to write primitive values as either a variable - // length UTF8 string or a fixed width bytes if serializing in binary format - bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0), serdeParams); - } - - if (bytes == null) { - return; - } - - put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes); - } - } - - private static byte[] getBytesFromStream(ByteStream.Output ss) { - byte [] buf = new byte[ss.getCount()]; - System.arraycopy(ss.getData(), 0, buf, 0, ss.getCount()); - return buf; - } - - /* - * Serialize the row into a ByteStream. - * - * @param obj The object for the current field. - * @param objInspector The ObjectInspector for the current Object. - * @param level The current level of separator. - * @param writeBinary Whether to write a primitive object as an UTF8 variable length string or - * as a fixed width byte array onto the byte stream. - * @throws IOException On error in writing to the serialization stream. - * @return true On serializing a non-null object, otherwise false. - */ - private static byte[] serialize(Object obj, ObjectInspector objInspector, int level, - boolean writeBinary, HBaseSerDeParameters serdeParams) throws IOException { - ByteStream.Output ss = new ByteStream.Output(); - if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) { - LazyUtils.writePrimitive(ss, obj, (PrimitiveObjectInspector) objInspector); - } else { - if (false == serialize(obj, objInspector, level, serdeParams, ss)) { - return null; - } - } - - return getBytesFromStream(ss); - } - - private static boolean serialize( - Object obj, - ObjectInspector objInspector, - int level, HBaseSerDeParameters serdeParams, ByteStream.Output ss) throws IOException { - - byte[] separators = serdeParams.getSeparators(); - boolean escaped = serdeParams.isEscaped(); - byte escapeChar = serdeParams.getEscapeChar(); - boolean[] needsEscape = serdeParams.getNeedsEscape(); - switch (objInspector.getCategory()) { - case PRIMITIVE: - LazyUtils.writePrimitiveUTF8(ss, obj, - (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape); - return true; - case LIST: - char separator = (char) separators[level]; - ListObjectInspector loi = (ListObjectInspector)objInspector; - List list = loi.getList(obj); - ObjectInspector eoi = loi.getListElementObjectInspector(); - if (list == null) { - return false; - } else { - for (int i = 0; i < list.size(); i++) { - if (i > 0) { - ss.write(separator); - } - serialize(list.get(i), eoi, level + 1, serdeParams, ss); - } - } - return true; - case MAP: - char sep = (char) separators[level]; - char keyValueSeparator = (char) separators[level+1]; - MapObjectInspector moi = (MapObjectInspector) objInspector; - ObjectInspector koi = moi.getMapKeyObjectInspector(); - ObjectInspector voi = moi.getMapValueObjectInspector(); - - Map map = moi.getMap(obj); - if (map == null) { - return false; - } else { - boolean first = true; - for (Map.Entry entry: map.entrySet()) { - if (first) { - first = false; - } else { - ss.write(sep); - } - serialize(entry.getKey(), koi, level+2, serdeParams, ss); - ss.write(keyValueSeparator); - serialize(entry.getValue(), voi, level+2, serdeParams, ss); - } - } - return true; - case STRUCT: - sep = (char)separators[level]; - StructObjectInspector soi = (StructObjectInspector)objInspector; - List fields = soi.getAllStructFieldRefs(); - list = soi.getStructFieldsDataAsList(obj); - if (list == null) { - return false; - } else { - for (int i = 0; i < list.size(); i++) { - if (i > 0) { - ss.write(sep); - } - - serialize(list.get(i), fields.get(i).getFieldObjectInspector(), - level + 1, serdeParams, ss); - } - } - return true; - default: - throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); - } - } - - /** - * Initialize the composite key class with the objectinspector for the key - * - * @throws SerDeException - * */ - private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException { - - int i = 0; - - // find the hbase row key - for (ColumnMapping colMap : serdeParams.getColumnMapping()) { - if (colMap.hbaseRowKey) { - break; - } - i++; - } - - ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector) - .getAllStructFieldRefs().get(i).getFieldObjectInspector(); - - try { - compositeKeyObj = serdeParams.getCompositeKeyClass().getDeclaredConstructor( - LazySimpleStructObjectInspector.class, Properties.class, Configuration.class) - .newInstance( - ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf); - } catch (IllegalArgumentException e) { - throw new SerDeException(e); - } catch (SecurityException e) { - throw new SerDeException(e); - } catch (InstantiationException e) { - throw new SerDeException(e); - } catch (IllegalAccessException e) { - throw new SerDeException(e); - } catch (InvocationTargetException e) { - throw new SerDeException(e); - } catch (NoSuchMethodException e) { - // the constructor wasn't defined in the implementation class. Flag error - throw new SerDeException("Constructor not defined in composite key class [" - + serdeParams.getCompositeKeyClass().getName() + "]", e); - } - } - - /** - * @return 0-based offset of the key column within the table - */ - int getKeyColumnOffset() { - return serdeParams.getKeyIndex(); - } - - List getStorageFormatOfCol(int colPos){ - return serdeParams.getColumnMapping().get(colPos).binaryStorage; } @Override @@ -581,18 +249,13 @@ public SerDeStats getSerDeStats() { return null; } - public static int getRowKeyColumnOffset(List columnsMapping) - throws SerDeException { - - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); - - if (colMap.hbaseRowKey && colMap.familyName.equals(HBASE_KEY_COL)) { - return i; - } - } + public HBaseKeyFactory getKeyFactory() { + return serdeParams.getKeyFactory(); + } - throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" + - " row key column."); + public static void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws Exception { + HBaseSerDeParameters serdeParams = + new HBaseSerDeParameters(jobConf, tableDesc.getProperties(), HBaseSerDe.class.getName()); + serdeParams.getKeyFactory().configureJobConf(tableDesc, jobConf); } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java index b64590d..07db3af 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java @@ -18,20 +18,17 @@ package org.apache.hadoop.hive.hbase; -import java.util.List; -import java.util.Properties; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; + +import java.util.List; +import java.util.Properties; /** * HBaseSerDeParameters encapsulates SerDeParameters and additional configurations that are specific for @@ -39,318 +36,72 @@ * */ public class HBaseSerDeParameters { - private SerDeParameters serdeParams; - private String columnMappingString; - private List columnMapping; - private boolean doColumnRegexMatching; + private final String serdeName; + private final SerDeParameters serdeParams; - private long putTimestamp; + private final String columnMappingString; + private final ColumnMappings columnMappings; + private final boolean doColumnRegexMatching; - private Class compositeKeyClass; - private int keyIndex; + private final long putTimestamp; + private final HBaseKeyFactory keyFactory; - void init(Configuration job, Properties tbl, String serdeName) throws SerDeException { - serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); - putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1")); - - String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); - if (compKeyClass != null) { - try { - compositeKeyClass = job.getClassByName(compKeyClass); - } catch (ClassNotFoundException e) { - throw new SerDeException(e); - } - } + HBaseSerDeParameters(Configuration job, Properties tbl, String serdeName) throws SerDeException { + this.serdeName = serdeName; + this.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); + this.putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1")); // Read configuration parameters columnMappingString = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING); doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true")); // Parse and initialize the HBase columns mapping - columnMapping = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching); + columnMappings.setHiveColumnDescription(serdeName, serdeParams.getColumnNames(), serdeParams.getColumnTypes()); + + // Precondition: make sure this is done after the rest of the SerDe initialization is done. + String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE); + columnMappings.parseColumnStorageTypes(hbaseTableStorageType); // Build the type property string if not supplied String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); if (columnTypeProperty == null) { - StringBuilder sb = new StringBuilder(); - - for (int i = 0; i < columnMapping.size(); i++) { - if (sb.length() > 0) { - sb.append(":"); - } - - ColumnMapping colMap = columnMapping.get(i); - - if (colMap.hbaseRowKey) { - // the row key column becomes a STRING - sb.append(serdeConstants.STRING_TYPE_NAME); - } else if (colMap.qualifierName == null) { - // a column family become a MAP - sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," - + serdeConstants.STRING_TYPE_NAME + ">"); - } else { - // an individual column becomes a STRING - sb.append(serdeConstants.STRING_TYPE_NAME); - } - } - tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString()); - } - - if (columnMapping.size() != serdeParams.getColumnNames().size()) { - throw new SerDeException(serdeName + ": columns has " + - serdeParams.getColumnNames().size() + - " elements while hbase.columns.mapping has " + - columnMapping.size() + " elements" + - " (counting the key if implicit)"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnMappings.toTypesString()); } - // check that the mapping schema is right; - // check that the "column-family:" is mapped to Map - // where key extends LazyPrimitive and thus has type Category.PRIMITIVE - for (int i = 0; i < columnMapping.size(); i++) { - ColumnMapping colMap = columnMapping.get(i); - if (colMap.qualifierName == null && !colMap.hbaseRowKey) { - TypeInfo typeInfo = serdeParams.getColumnTypes().get(i); - if ((typeInfo.getCategory() != Category.MAP) || - (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory() - != Category.PRIMITIVE)) { - - throw new SerDeException( - serdeName + ": hbase column family '" + colMap.familyName - + "' should be mapped to Map,?>, that is " - + "the Key for the map should be of primitive type, but is mapped to " - + typeInfo.getTypeName()); - } - } - } - - // Precondition: make sure this is done after the rest of the SerDe initialization is done. - String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE); - parseColumnStorageTypes(hbaseTableStorageType); - setKeyColumnOffset(); + this.keyFactory = initKeyFactory(job, tbl); } - /* - * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying - * whether to use a binary or an UTF string format to serialize and de-serialize primitive - * data types like boolean, byte, short, int, long, float, and double. This applies to - * regular columns and also to map column types which are associated with an HBase column - * family. For the map types, we apply the specification to the key or the value provided it - * is one of the above primitive types. The specifier is a colon separated value of the form - * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string - * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the - * table level default. - * - * @param hbaseTableDefaultStorageType - the specification associated with the table property - * hbase.table.default.storage.type - * @throws SerDeException on parse error. - */ - - public void parseColumnStorageTypes(String hbaseTableDefaultStorageType) - throws SerDeException { - - boolean tableBinaryStorage = false; - - if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) { - if (hbaseTableDefaultStorageType.equals("binary")) { - tableBinaryStorage = true; - } else if (!hbaseTableDefaultStorageType.equals("string")) { - throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE + - " parameter must be specified as" + - " 'string' or 'binary'; '" + hbaseTableDefaultStorageType + - "' is not a valid specification for this table/serde property."); - } - } - - // parse the string to determine column level storage type for primitive types - // 's' is for variable length string format storage - // 'b' is for fixed width binary storage of bytes - // '-' is for table storage type, which defaults to UTF8 string - // string data is always stored in the default escaped storage format; the data types - // byte, short, int, long, float, and double have a binary byte oriented storage option - List columnTypes = serdeParams.getColumnTypes(); - - for (int i = 0; i < columnMapping.size(); i++) { - - ColumnMapping colMap = columnMapping.get(i); - TypeInfo colType = columnTypes.get(i); - String mappingSpec = colMap.mappingSpec; - String [] mapInfo = mappingSpec.split("#"); - String [] storageInfo = null; - - if (mapInfo.length == 2) { - storageInfo = mapInfo[1].split(":"); - } - - if (storageInfo == null) { - - // use the table default storage specification - if (colType.getCategory() == Category.PRIMITIVE) { - if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - colMap.binaryStorage.add(tableBinaryStorage); - } else { - colMap.binaryStorage.add(false); - } - } else if (colType.getCategory() == Category.MAP) { - TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo(); - TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo(); - - if (keyTypeInfo.getCategory() == Category.PRIMITIVE && - !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - colMap.binaryStorage.add(tableBinaryStorage); - } else { - colMap.binaryStorage.add(false); - } - - if (valueTypeInfo.getCategory() == Category.PRIMITIVE && - !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - colMap.binaryStorage.add(tableBinaryStorage); - } else { - colMap.binaryStorage.add(false); - } - } else { - colMap.binaryStorage.add(false); - } - - } else if (storageInfo.length == 1) { - // we have a storage specification for a primitive column type - String storageOption = storageInfo[0]; - - if ((colType.getCategory() == Category.MAP) || - !(storageOption.equals("-") || "string".startsWith(storageOption) || - "binary".startsWith(storageOption))) { - throw new SerDeException("Error: A column storage specification is one of the following:" - + " '-', a prefix of 'string', or a prefix of 'binary'. " - + storageOption + " is not a valid storage option specification for " - + serdeParams.getColumnNames().get(i)); - } - - if (colType.getCategory() == Category.PRIMITIVE && - !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - - if ("-".equals(storageOption)) { - colMap.binaryStorage.add(tableBinaryStorage); - } else if ("binary".startsWith(storageOption)) { - colMap.binaryStorage.add(true); - } else { - colMap.binaryStorage.add(false); - } - } else { - colMap.binaryStorage.add(false); - } - - } else if (storageInfo.length == 2) { - // we have a storage specification for a map column type - - String keyStorage = storageInfo[0]; - String valStorage = storageInfo[1]; - - if ((colType.getCategory() != Category.MAP) || - !(keyStorage.equals("-") || "string".startsWith(keyStorage) || - "binary".startsWith(keyStorage)) || - !(valStorage.equals("-") || "string".startsWith(valStorage) || - "binary".startsWith(valStorage))) { - throw new SerDeException("Error: To specify a valid column storage type for a Map" - + " column, use any two specifiers from '-', a prefix of 'string', " - + " and a prefix of 'binary' separated by a ':'." - + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the" - + " key and value parts of the Map respectively." - + " Invalid storage specification for column " - + serdeParams.getColumnNames().get(i) - + "; " + storageInfo[0] + ":" + storageInfo[1]); - } - - TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo(); - TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo(); - - if (keyTypeInfo.getCategory() == Category.PRIMITIVE && - !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - - if (keyStorage.equals("-")) { - colMap.binaryStorage.add(tableBinaryStorage); - } else if ("binary".startsWith(keyStorage)) { - colMap.binaryStorage.add(true); - } else { - colMap.binaryStorage.add(false); - } - } else { - colMap.binaryStorage.add(false); - } - - if (valueTypeInfo.getCategory() == Category.PRIMITIVE && - !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) { - if (valStorage.equals("-")) { - colMap.binaryStorage.add(tableBinaryStorage); - } else if ("binary".startsWith(valStorage)) { - colMap.binaryStorage.add(true); - } else { - colMap.binaryStorage.add(false); - } - } else { - colMap.binaryStorage.add(false); - } - - if (colMap.binaryStorage.size() != 2) { - throw new SerDeException("Error: In parsing the storage specification for column " - + serdeParams.getColumnNames().get(i)); - } - - } else { - // error in storage specification - throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification " - + mappingSpec + " is not valid for column: " - + serdeParams.getColumnNames().get(i)); + private HBaseKeyFactory initKeyFactory(Configuration conf, Properties tbl) throws SerDeException { + try { + HBaseKeyFactory keyFactory = createKeyFactory(conf, tbl); + if (keyFactory != null) { + keyFactory.init(this, tbl); } + return keyFactory; + } catch (Exception e) { + throw new SerDeException(e); } } - void setKeyColumnOffset() throws SerDeException { - setKeyIndex(getRowKeyColumnOffset(columnMapping)); - } - - public static int getRowKeyColumnOffset(List columnsMapping) - throws SerDeException { - - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); - - if (colMap.hbaseRowKey && colMap.familyName.equals(HBaseSerDe.HBASE_KEY_COL)) { - return i; - } + private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tbl) throws Exception { + String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY); + if (factoryClassName != null) { + Class factoryClazz = Class.forName(factoryClassName); + return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job); } - - throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" + - " row key column."); - } - - public StructTypeInfo getRowTypeInfo() { - return (StructTypeInfo) serdeParams.getRowTypeInfo(); + String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + if (keyClassName != null) { + Class keyClass = Class.forName(keyClassName); + return new CompositeHBaseKeyFactory(keyClass); + } + return new DefaultHBaseKeyFactory(); } public List getColumnNames() { return serdeParams.getColumnNames(); } - public byte[] getSeparators() { - return serdeParams.getSeparators(); - } - - public Text getNullSequence() { - return serdeParams.getNullSequence(); - } - - public boolean isLastColumnTakesRest() { - return serdeParams.isLastColumnTakesRest(); - } - - public boolean isEscaped() { - return serdeParams.isEscaped(); - } - - public byte getEscapeChar() { - return serdeParams.getEscapeChar(); - } - public List getColumnTypes() { return serdeParams.getColumnTypes(); } @@ -359,59 +110,38 @@ public SerDeParameters getSerdeParams() { return serdeParams; } - public void setSerdeParams(SerDeParameters serdeParams) { - this.serdeParams = serdeParams; - } - - public String getColumnMappingString() { - return columnMappingString; - } - - public void setColumnMappingString(String columnMappingString) { - this.columnMappingString = columnMappingString; - } - public long getPutTimestamp() { return putTimestamp; } - public void setPutTimestamp(long putTimestamp) { - this.putTimestamp = putTimestamp; - } - - public boolean isDoColumnRegexMatching() { - return doColumnRegexMatching; - } - - public void setDoColumnRegexMatching(boolean doColumnRegexMatching) { - this.doColumnRegexMatching = doColumnRegexMatching; - } - - public Class getCompositeKeyClass() { - return compositeKeyClass; - } - - public void setCompositeKeyClass(Class compositeKeyClass) { - this.compositeKeyClass = compositeKeyClass; + public int getKeyIndex() { + return columnMappings.getKeyIndex(); } - public int getKeyIndex() { - return keyIndex; + public ColumnMapping getKeyColumnMapping() { + return columnMappings.getKeyMapping(); } - public void setKeyIndex(int keyIndex) { - this.keyIndex = keyIndex; + public ColumnMappings getColumnMappings() { + return columnMappings; } - public List getColumnMapping() { - return columnMapping; + public HBaseKeyFactory getKeyFactory() { + return keyFactory; } - public ColumnMapping getKeyColumnMapping() { - return columnMapping.get(keyIndex); + public TypeInfo getTypeForName(String columnName) { + List columnNames = serdeParams.getColumnNames(); + List columnTypes = serdeParams.getColumnTypes(); + for (int i = 0; i < columnNames.size(); i++) { + if (columnName.equals(columnNames.get(i))) { + return columnTypes.get(i); + } } + throw new IllegalArgumentException("Invalid column name " + columnName); + } - public boolean[] getNeedsEscape() { - return serdeParams.getNeedsEscape(); + public String toString() { + return "[" + columnMappingString + ":" + getColumnNames() + ":" + getColumnTypes() + "]"; } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 4fe1b1b..255ffa2 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -155,7 +155,7 @@ public void preCreateTable(Table tbl) throws MetaException { Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - List columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, true); + ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); HTableDescriptor tableDesc; @@ -166,7 +166,7 @@ public void preCreateTable(Table tbl) throws MetaException { tableDesc = new HTableDescriptor(tableName); Set uniqueColumnFamilies = new HashSet(); - for (ColumnMapping colMap : columnsMapping) { + for (ColumnMapping colMap : columnMappings) { if (!colMap.hbaseRowKey) { uniqueColumnFamilies.add(colMap.familyName); } @@ -192,8 +192,7 @@ public void preCreateTable(Table tbl) throws MetaException { // make sure the schema mapping is right tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName)); - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); + for (ColumnMapping colMap : columnMappings) { if (colMap.hbaseRowKey) { continue; @@ -378,6 +377,7 @@ private void addHBaseDelegationToken(Configuration conf) throws IOException { @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try { + HBaseSerDe.configureJobConf(tableDesc, jobConf); /* * HIVE-6356 * The following code change is only needed for hbase-0.96.0 due to HBASE-9165, and @@ -392,7 +392,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { TableMapReduceUtil.addDependencyJars(copy); merged.addAll(copy.getConfiguration().getStringCollection("tmpjars")); jobConf.set("tmpjars", StringUtils.arrayToString(merged.toArray(new String[0]))); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -403,22 +403,26 @@ public DecomposedPredicate decomposePredicate( Deserializer deserializer, ExprNodeDesc predicate) { + HBaseKeyFactory keyFactory = ((HBaseSerDe) deserializer).getKeyFactory(); + return keyFactory.decomposePredicate(jobConf, deserializer, predicate); + } + + public static DecomposedPredicate decomposePredicate( + JobConf jobConf, + HBaseSerDe hBaseSerDe, + ExprNodeDesc predicate) { String columnNameProperty = jobConf.get( - org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS); + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS); List columnNames = - Arrays.asList(columnNameProperty.split(",")); - - HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer; - int keyColPos = hbaseSerde.getKeyColumnOffset(); - String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES). - split(",")[keyColPos]; - IndexPredicateAnalyzer analyzer = - HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(columnNames.get(keyColPos), keyColType, - hbaseSerde.getStorageFormatOfCol(keyColPos).get(0)); + Arrays.asList(columnNameProperty.split(",")); + + ColumnMapping keyMapping = hBaseSerDe.getHBaseSerdeParam().getKeyColumnMapping(); + IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer( + keyMapping.columnName, keyMapping.columnType, keyMapping.binaryStorage.get(0)); List searchConditions = - new ArrayList(); + new ArrayList(); ExprNodeGenericFuncDesc residualPredicate = - (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); int scSize = searchConditions.size(); if (scSize < 1 || 2 < scSize) { // Either there was nothing which could be pushed down (size = 0), diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 142bfd8..2dbe334 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; @@ -93,15 +94,15 @@ String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - List columnsMapping = null; + ColumnMappings columnMappings; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } - if (columnsMapping.size() < readColIDs.size()) { + if (columnMappings.size() < readColIDs.size()) { throw new IOException("Cannot read more columns than the given table contains."); } @@ -113,8 +114,9 @@ List addedFamilies = new ArrayList(); if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); for (int i : readColIDs) { - ColumnMapping colMap = columnsMapping.get(i); + ColumnMapping colMap = columnsMapping[i]; if (colMap.hbaseRowKey) { continue; } @@ -139,8 +141,7 @@ // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive // tables column projection. if (empty) { - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); + for (ColumnMapping colMap: columnMappings) { if (colMap.hbaseRowKey) { continue; } @@ -256,6 +257,18 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL Scan scan = new Scan(); + String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + if (filterObjectSerialized != null) { + HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, + HBaseScanRange.class); + try { + range.setup(scan, jobConf); + } catch (Exception e) { + throw new IOException(e); + } + return scan; + } + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (filterExprSerialized == null) { return scan; @@ -324,6 +337,10 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) } scan.setStartRow(startRow); scan.setStopRow(stopRow); + + if (LOG.isDebugEnabled()) { + LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); + } return scan; } @@ -374,6 +391,12 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) System.arraycopy(current, 0, next, 0, current.length); return next; } + + static IndexPredicateAnalyzer newIndexPredicateAnalyzer( + String keyColumnName, TypeInfo keyColType, boolean isKeyBinary) { + return newIndexPredicateAnalyzer(keyColumnName, keyColType.getTypeName(), isKeyBinary); + } + /** * Instantiates a new predicate analyzer suitable for * determining how to push a filter down into the HBase scan, @@ -423,20 +446,15 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( throw new IOException("hbase.columns.mapping required for HBase Table."); } - List columnsMapping = null; + ColumnMappings columnMappings = null; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } - int iKey; - - try { - iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping); - } catch (SerDeException e) { - throw new IOException(e); - } + int iKey = columnMappings.getKeyIndex(); + ColumnMapping keyMapping = columnMappings.getKeyMapping(); // Take filter pushdown into account while calculating splits; this // allows us to prune off regions immediately. Note that although @@ -445,7 +463,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). Scan scan = createFilterScan(jobConf, iKey, - getStorageFormatOfKey(columnsMapping.get(iKey).mappingSpec, + getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); @@ -454,8 +472,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // REVIEW: are we supposed to be applying the getReadColumnIDs // same as in getRecordReader? - for (int i = 0; i columnsMapping; - private Object compositeKeyObj; + private ColumnMapping[] columnsMapping; private ArrayList cachedList; - /** - * Construct a LazyHBaseRow object with the ObjectInspector. - */ + private final int iKey; + private final HBaseKeyFactory keyFactory; + public LazyHBaseRow(LazySimpleStructObjectInspector oi) { - super(oi); + this(oi, -1, null); } /** - * Set the HBase row data(a Result writable) for this LazyStruct. - * @see LazyHBaseRow#init(Result) + * Construct a LazyHBaseRow object with the ObjectInspector. */ - public void init(Result r, List columnsMapping) { - init(r, columnsMapping, null); + public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory) { + super(oi); + this.iKey = iKey; + this.keyFactory = keyFactory; } /** * Set the HBase row data(a Result writable) for this LazyStruct. - * - * @see LazyHBaseRow#init(Result) - * - * @param compositeKeyClass - * custom implementation to interpret the composite key + * @see LazyHBaseRow#init(org.apache.hadoop.hbase.client.Result) */ - public void init(Result r, List columnsMapping, Object compositeKeyObj) { - - result = r; - this.columnsMapping = columnsMapping; - this.compositeKeyObj = compositeKeyObj; + public void init(Result r, ColumnMappings columnsMappings) { + this.result = r; + this.columnsMapping = columnsMappings.getColumnsMapping(); setParsed(false); } - /** - * Parse the Result and fill each field. - * @see LazyStruct#parse() - */ - private void parse() { - - if (getFields() == null) { - List fieldRefs = - ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); - LazyObject [] fields = new LazyObject[fieldRefs.size()]; - - for (int i = 0; i < fields.length; i++) { - ColumnMapping colMap = columnsMapping.get(i); - - if (colMap.qualifierName == null && !colMap.hbaseRowKey) { - // a column family - fields[i] = new LazyHBaseCellMap( - (LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector()); - continue; - } - - fields[i] = LazyFactory.createLazyObject( - fieldRefs.get(i).getFieldObjectInspector(), - colMap.binaryStorage.get(0)); - } - - setFields(fields); - setFieldInited(new boolean[fields.length]); + @Override + protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { + if (fieldID == iKey) { + return keyFactory.createKey(fieldRef.getFieldObjectInspector()); } - - Arrays.fill(getFieldInited(), false); - setParsed(true); + ColumnMapping colMap = columnsMapping[fieldID]; + if (colMap.qualifierName == null && !colMap.hbaseRowKey) { + // a column family + return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector()); + } + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(), + colMap.binaryStorage.get(0)); } /** @@ -127,16 +99,17 @@ private void parse() { */ @Override public Object getField(int fieldID) { - if (!getParsed()) { - parse(); - } - - Object value = uncheckedGetField(fieldID); + initFields(); + return uncheckedGetField(fieldID); + } - if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) { - return compositeKeyObj; - } else { - return value; + private void initFields() { + if (getFields() == null) { + initLazyFields(oi.getAllStructFieldRefs()); + } + if (!getParsed()) { + Arrays.fill(getFieldInited(), false); + setParsed(true); } } @@ -149,12 +122,12 @@ public Object getField(int fieldID) { */ private Object uncheckedGetField(int fieldID) { - LazyObject [] fields = getFields(); + LazyObjectBase[] fields = getFields(); boolean [] fieldsInited = getFieldInited(); if (!fieldsInited[fieldID]) { ByteArrayRef ref = null; - ColumnMapping colMap = columnsMapping.get(fieldID); + ColumnMapping colMap = columnsMapping[fieldID]; if (colMap.hbaseRowKey) { ref = new ByteArrayRef(); @@ -182,12 +155,6 @@ private Object uncheckedGetField(int fieldID) { if (ref != null) { fields[fieldID].init(ref, 0, ref.getData().length); - - // if it was a row key and we have been provided a custom composite key class, initialize it - // with the bytes for the row key - if (colMap.hbaseRowKey && compositeKeyObj != null) { - ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length); - } } } @@ -203,9 +170,7 @@ private Object uncheckedGetField(int fieldID) { */ @Override public ArrayList getFieldsAsList() { - if (!getParsed()) { - parse(); - } + initFields(); if (cachedList == null) { cachedList = new ArrayList(); } else { diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java index 13c344b..7da360f 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hive.hbase; -import java.util.Properties; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import java.util.Properties; + public class HBaseTestCompositeKey extends HBaseCompositeKey { byte[] bytes; diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java new file mode 100644 index 0000000..accc312 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.serde2.BaseStructObjectInspector; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestHBaseKeyFactory extends DefaultHBaseKeyFactory { + + private static final String DELIMITER_PATTERN = "\\$\\$"; + private static final byte[] DELIMITER_BINARY = "$$".getBytes(); + + @Override + public ObjectInspector createKeyObjectInspector(TypeInfo type) { + return new SlashSeparatedOI((StructTypeInfo)type); + } + + @Override + public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException { + return new DoubleDollarSeparated(); + } + + private final ByteStream.Output output = new ByteStream.Output(); + + @Override + public byte[] serializeKey(Object object, StructField field) throws IOException { + ObjectInspector inspector = field.getFieldObjectInspector(); + if (inspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new IllegalStateException("invalid type value " + inspector.getTypeName()); + } + output.reset(); + for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) { + if (output.getCount() > 0) { + output.write(DELIMITER_BINARY); + } + output.write(String.valueOf(element).getBytes()); + } + return output.getCount() > 0 ? output.toByteArray() : null; + } + + private static class DoubleDollarSeparated implements LazyObjectBase { + + private Object[] fields; + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + fields = new String(bytes.getData(), start, length).split(DELIMITER_PATTERN); + } + + @Override + public Object getObject() { + return this; + } + } + + private static class SlashSeparatedOI extends BaseStructObjectInspector { + + private int length; + + private SlashSeparatedOI(StructTypeInfo type) { + List names = type.getAllStructFieldNames(); + List ois = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + init(names, ois, null); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return ((DoubleDollarSeparated)data).fields[((MyField)fieldRef).getFieldID()]; + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return Arrays.asList(((DoubleDollarSeparated)data).fields); + } + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java new file mode 100644 index 0000000..1bd2352 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.BaseStructObjectInspector; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.mapred.JobConf; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestHBaseKeyFactory2 extends AbstractHBaseKeyFactory { + + private static final int FIXED_LENGTH = 10; + + @Override + public ObjectInspector createKeyObjectInspector(TypeInfo type) { + return new StringArrayOI((StructTypeInfo)type); + } + + @Override + public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException { + return new FixedLengthed(FIXED_LENGTH); + } + + private final ByteStream.Output output = new ByteStream.Output(); + + @Override + public byte[] serializeKey(Object object, StructField field) throws IOException { + ObjectInspector inspector = field.getFieldObjectInspector(); + if (inspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new IllegalStateException("invalid type value " + inspector.getTypeName()); + } + output.reset(); + for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) { + output.write(toBinary(String.valueOf(element).getBytes(), FIXED_LENGTH, false, false)); + } + return output.getCount() > 0 ? output.toByteArray() : null; + } + + private byte[] toBinary(String value, int max, boolean end, boolean nextBA) { + return toBinary(value.getBytes(), max, end, nextBA); + } + + private byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) { + byte[] bytes = new byte[max + 1]; + System.arraycopy(value, 0, bytes, 0, Math.min(value.length, max)); + if (end) { + Arrays.fill(bytes, value.length, max, (byte)0xff); + } + if (nextBA) { + bytes[max] = 0x01; + } + return bytes; + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + String keyColName = keyMapping.columnName; + + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(false); + analyzer.allowColumnName(keyColName); + analyzer.setAcceptsFields(true); + + DecomposedPredicate decomposed = new DecomposedPredicate(); + + List searchConditions = new ArrayList(); + decomposed.residualPredicate = + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); + if (!searchConditions.isEmpty()) { + decomposed.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + try { + decomposed.pushedPredicateObject = setupFilter(keyColName, searchConditions); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return decomposed; + } + + private HBaseScanRange setupFilter(String keyColName, List conditions) + throws IOException { + Map> fieldConds = + new HashMap>(); + for (IndexSearchCondition condition : conditions) { + assert keyColName.equals(condition.getColumnDesc().getColumn()); + String fieldName = condition.getFields()[0]; + List fieldCond = fieldConds.get(fieldName); + if (fieldCond == null) { + fieldConds.put(fieldName, fieldCond = new ArrayList()); + } + fieldCond.add(condition); + } + HBaseScanRange range = new HBaseScanRange(); + + ByteArrayOutputStream startRow = new ByteArrayOutputStream(); + ByteArrayOutputStream stopRow = new ByteArrayOutputStream(); + + StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; + for (String name : type.getAllStructFieldNames()) { + List fieldCond = fieldConds.get(name); + if (fieldCond == null || fieldCond.size() > 2) { + continue; + } + byte[] startElement = null; + byte[] stopElement = null; + for (IndexSearchCondition condition : fieldCond) { + if (condition.getConstantDesc().getValue() == null) { + continue; + } + String comparisonOp = condition.getComparisonOp(); + String constantVal = String.valueOf(condition.getConstantDesc().getValue()); + + if (comparisonOp.endsWith("UDFOPEqual")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, false); + } else if (comparisonOp.endsWith("UDFOPLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + } + if (startRow != null) { + if (startElement != null) { + startRow.write(startElement); + } else { + if (startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); + } + startRow = null; + } + } + if (stopRow != null) { + if (stopElement != null) { + stopRow.write(stopElement); + } else { + if (stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + stopRow = null; + } + } + if (startElement == null && stopElement == null) { + break; + } + } + if (startRow != null && startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); + } + if (stopRow != null && stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + return range; + } + + private static class FixedLengthed implements LazyObjectBase { + + private final int fixedLength; + private final List fields = new ArrayList(); + + public FixedLengthed(int fixedLength) { + this.fixedLength = fixedLength; + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + fields.clear(); + byte[] data = bytes.getData(); + int rowStart = start; + int rowStop = rowStart + fixedLength; + for (; rowStart < length; rowStart = rowStop + 1, rowStop = rowStart + fixedLength) { + fields.add(new String(data, rowStart, rowStop - rowStart).trim()); + } + } + + @Override + public Object getObject() { + return this; + } + } + + private static class StringArrayOI extends BaseStructObjectInspector { + + private int length; + + private StringArrayOI(StructTypeInfo type) { + List names = type.getAllStructFieldNames(); + List ois = new ArrayList(); + for (int i = 0; i < names.size(); i++) { + ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + init(names, ois, null); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + return ((FixedLengthed)data).fields.get(((MyField)fieldRef).getFieldID()); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return ((FixedLengthed)data).fields; + } + } +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java index 7c4fc9f..9a31f0f 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; @@ -463,17 +463,15 @@ public void testLazyHBaseRow1() throws SerDeException { Text nullSequence = new Text("\\N"); String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:c,cfb:d"; - List columnsMapping = null; + ColumnMappings columnMappings = null; try { - columnsMapping = parseColumnsMapping(hbaseColsMapping); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColsMapping); } catch (SerDeException e) { fail(e.toString()); } - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); - + for (ColumnMapping colMap : columnMappings) { if (!colMap.hbaseRowKey && colMap.qualifierName == null) { colMap.binaryStorage.add(false); colMap.binaryStorage.add(false); @@ -499,7 +497,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); Result r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':123,'b':['a','b','c']," @@ -513,7 +511,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':123,'b':null," @@ -529,7 +527,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':null,'b':['a']," @@ -543,7 +541,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':null,'b':['','a','','']," @@ -567,7 +565,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes(""))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), @@ -587,18 +585,17 @@ public void testLazyHBaseRow2() throws SerDeException { List fieldNames = Arrays.asList( new String[]{"key", "a", "b", "c", "d"}); Text nullSequence = new Text("\\N"); - List columnsMapping = null; String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:,cfc:d"; + ColumnMappings columnMappings = null; + try { - columnsMapping = parseColumnsMapping(hbaseColsMapping); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColsMapping); } catch (SerDeException e) { fail(e.toString()); } - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); - + for (ColumnMapping colMap : columnMappings) { if (!colMap.hbaseRowKey && colMap.qualifierName == null) { colMap.binaryStorage.add(false); colMap.binaryStorage.add(false); @@ -627,7 +624,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); Result r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':123,'b':['a','b','c']," @@ -643,7 +640,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':123,'b':null," @@ -659,7 +656,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':null,'b':['a']," @@ -673,7 +670,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( ("{'key':'test-row','a':null,'b':['','a','','']," @@ -689,7 +686,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes(""))); r = new Result(kvs); - o.init(r, columnsMapping); + o.init(r, columnMappings); assertEquals( "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), @@ -713,16 +710,17 @@ public void testLazyHBaseRow3() throws SerDeException { String hbaseColumnsMapping = ":key#str,cf-int:cq-int#bin,cf-byte:cq-byte#bin," + "cf-short:cq-short#bin,cf-long:cq-long#bin,cf-float:cq-float#bin,cf-double:cq-double#bin," + "cf-string:cq-string#str,cf-bool:cq-bool#bin"; - List columnsMapping = null; + ColumnMappings columnMappings = null; try { - columnsMapping = parseColumnsMapping(hbaseColumnsMapping); - } catch (SerDeException sde) { - fail(sde.toString()); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + } catch (SerDeException e) { + fail(e.toString()); } - for (int i = 0; i < columnsMapping.size(); i++) { - ColumnMapping colMap = columnsMapping.get(i); + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i = 0; i < columnsMapping.length; i++) { + ColumnMapping colMap = columnsMapping[i]; if (i == 0 || i == 7) { colMap.binaryStorage.add(false); @@ -741,7 +739,7 @@ public void testLazyHBaseRow3() throws SerDeException { List kvs = new ArrayList(); byte [] value; - for (int i = 1; i < columnsMapping.size(); i++) { + for (int i = 1; i < columnsMapping.length; i++) { switch (i) { @@ -781,13 +779,13 @@ public void testLazyHBaseRow3() throws SerDeException { throw new RuntimeException("Not expected: " + i); } - ColumnMapping colMap = columnsMapping.get(i); + ColumnMapping colMap = columnsMapping[i]; kvs.add(new KeyValue(rowKey, colMap.familyNameBytes, colMap.qualifierNameBytes, value)); } Collections.sort(kvs, KeyValue.COMPARATOR); Result result = new Result(kvs); - o.init(result, columnsMapping); + o.init(result, columnMappings); List fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs(); @@ -850,19 +848,4 @@ public void testLazyHBaseRow3() throws SerDeException { } } } - - /** - * Parses the HBase columns mapping specifier to identify the column families, qualifiers - * and also caches the byte arrays corresponding to them. One of the Hive table - * columns maps to the HBase row key, by default the first column. - * - * @param columnsMappingSpec string hbase.columns.mapping specified when creating table - * @return List which contains the column mapping information by position - * @throws SerDeException - */ - public static List parseColumnsMapping(String columnsMappingSpec) - throws SerDeException { - return HBaseSerDe.parseColumnsMapping(columnsMappingSpec, true); - } - } diff --git hbase-handler/src/test/queries/positive/hbase_custom_key.q hbase-handler/src/test/queries/positive/hbase_custom_key.q new file mode 100644 index 0000000..b7c00c7 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_custom_key.q @@ -0,0 +1,18 @@ +CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory"); + +CREATE EXTERNAL TABLE hbase_ck_2(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string"); + +from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value'; + +select * from hbase_ck_1; +select * from hbase_ck_2; diff --git hbase-handler/src/test/queries/positive/hbase_custom_key2.q hbase-handler/src/test/queries/positive/hbase_custom_key2.q new file mode 100644 index 0000000..583de06 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_custom_key2.q @@ -0,0 +1,35 @@ +CREATE TABLE hbase_ck_4(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom2", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2"); + +from src tablesample (5 rows) +insert into table hbase_ck_4 select +struct( + cast(key as string), + cast(cast(key + 1000 as int) as string), + cast(cast(key + 2000 as int) as string)), +value; + +set hive.fetch.task.conversion=more; + +-- 165,238,27,311,86 +select * from hbase_ck_4; + +-- 238 +explain +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'; +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'; + +-- 165,238 +explain +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'; +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'; + +-- 238,311 +explain +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'; +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'; + diff --git hbase-handler/src/test/results/positive/hbase_custom_key.q.out hbase-handler/src/test/results/positive/hbase_custom_key.q.out new file mode 100644 index 0000000..4586f2b --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_custom_key.q.out @@ -0,0 +1,60 @@ +PREHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_1 +PREHOOK: query: CREATE EXTERNAL TABLE hbase_ck_2(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE EXTERNAL TABLE hbase_ck_2(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom", + "hbase.columns.mapping" = ":key,cf:string") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_2 +PREHOOK: query: from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_ck_1 +POSTHOOK: query: from src tablesample (1 rows) +insert into table hbase_ck_1 select struct('1000','2000','3000'),'value' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_ck_1 +PREHOOK: query: select * from hbase_ck_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_1 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_1 +#### A masked pattern was here #### +{"col1":"1000","col2":"2000","col3":"3000"} value +PREHOOK: query: select * from hbase_ck_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_2 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_2 +#### A masked pattern was here #### +1000$$2000$$3000 value diff --git hbase-handler/src/test/results/positive/hbase_custom_key2.q.out hbase-handler/src/test/results/positive/hbase_custom_key2.q.out new file mode 100644 index 0000000..a200e3d --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_custom_key2.q.out @@ -0,0 +1,168 @@ +PREHOOK: query: CREATE TABLE hbase_ck_4(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom2", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_ck_4(key struct, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_custom2", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_ck_4 +PREHOOK: query: from src tablesample (5 rows) +insert into table hbase_ck_4 select +struct( + cast(key as string), + cast(cast(key + 1000 as int) as string), + cast(cast(key + 2000 as int) as string)), +value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_ck_4 +POSTHOOK: query: from src tablesample (5 rows) +insert into table hbase_ck_4 select +struct( + cast(key as string), + cast(cast(key + 1000 as int) as string), + cast(cast(key + 2000 as int) as string)), +value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_ck_4 +PREHOOK: query: -- 165,238,27,311,86 +select * from hbase_ck_4 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +POSTHOOK: query: -- 165,238,27,311,86 +select * from hbase_ck_4 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +{"col1":"165","col2":"1165","col3":"2165"} val_165 +{"col1":"238","col2":"1238","col3":"2238"} val_238 +{"col1":"27","col2":"1027","col3":"2027"} val_27 +{"col1":"311","col2":"1311","col3":"2311"} val_311 +{"col1":"86","col2":"1086","col3":"2086"} val_86 +PREHOOK: query: -- 238 +explain +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238' +PREHOOK: type: QUERY +POSTHOOK: query: -- 238 +explain +select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_ck_4 + filterExpr: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: struct), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +{"col1":"238","col2":"1238","col3":"2238"} val_238 +PREHOOK: query: -- 165,238 +explain +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27' +PREHOOK: type: QUERY +POSTHOOK: query: -- 165,238 +explain +select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_ck_4 + filterExpr: ((key.col1 >= '165') and (key.col1 < '27')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key.col1 >= '165') and (key.col1 < '27')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: struct), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +{"col1":"165","col2":"1165","col3":"2165"} val_165 +{"col1":"238","col2":"1238","col3":"2238"} val_238 +PREHOOK: query: -- 238,311 +explain +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238' +PREHOOK: type: QUERY +POSTHOOK: query: -- 238,311 +explain +select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_ck_4 + filterExpr: ((key.col1 > '100') and (key.col2 >= '1238')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key.col1 > '100') and (key.col2 >= '1238')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: struct), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_ck_4 +#### A masked pattern was here #### +{"col1":"238","col2":"1238","col3":"2238"} val_238 +{"col1":"311","col2":"1311","col3":"2311"} val_311 diff --git itests/util/pom.xml itests/util/pom.xml index e9720df..b38c55b 100644 --- itests/util/pom.xml +++ itests/util/pom.xml @@ -46,6 +46,12 @@ org.apache.hive + hive-hbase-handler + ${project.version} + tests + + + org.apache.hive hive-metastore ${project.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 113227d..ed6134e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -662,11 +662,7 @@ public static Path getPlanPath(Configuration conf) { * @return Bytes. */ public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - runtimeSerializationKryo.get().writeObject(output, expr); - output.close(); - return baos.toByteArray(); + return serializeObjectToKryo(expr); } /** @@ -675,11 +671,7 @@ public static Path getPlanPath(Configuration conf) { * @return Expression; null if deserialization succeeded, but the result type is incorrect. */ public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) { - Input inp = new Input(new ByteArrayInputStream(bytes)); - ExprNodeGenericFuncDesc func = runtimeSerializationKryo.get(). - readObject(inp,ExprNodeGenericFuncDesc.class); - inp.close(); - return func; + return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class); } public static String serializeExpression(ExprNodeGenericFuncDesc expr) { @@ -700,6 +692,37 @@ public static ExprNodeGenericFuncDesc deserializeExpression(String s) { return deserializeExpressionFromKryo(bytes); } + private static byte[] serializeObjectToKryo(Serializable object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + runtimeSerializationKryo.get().writeObject(output, object); + output.close(); + return baos.toByteArray(); + } + + private static T deserializeObjectFromKryo(byte[] bytes, Class clazz) { + Input inp = new Input(new ByteArrayInputStream(bytes)); + T func = runtimeSerializationKryo.get().readObject(inp, clazz); + inp.close(); + return func; + } + + public static String serializeObject(Serializable expr) { + try { + return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static T deserializeObject(String s, Class clazz) { + try { + return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate { @Override protected Expression instantiate(Object oldInstance, Encoder out) { diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java index d39ee2e..683618f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; @@ -57,12 +58,19 @@ private final Set udfNames; private final Set allowedColumnNames; + private FieldValidator fieldValidator; + + private boolean acceptsFields; public IndexPredicateAnalyzer() { udfNames = new HashSet(); allowedColumnNames = new HashSet(); } + public void setFieldValidator(FieldValidator fieldValidator) { + this.fieldValidator = fieldValidator; + } + /** * Registers a comparison operator as one which can be satisfied * by an index search. Unless this is called, analyzePredicate @@ -175,11 +183,19 @@ private ExprNodeDesc analyzeExpr( ExprNodeDesc expr1 = (ExprNodeDesc) nodeOutputs[0]; ExprNodeDesc expr2 = (ExprNodeDesc) nodeOutputs[1]; ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2); - if (extracted == null) { + if (extracted == null || (extracted.length > 2 && !acceptsFields)) { return expr; } - if (extracted.length > 2) { + + ExprNodeColumnDesc columnDesc; + ExprNodeConstantDesc constantDesc; + if (extracted[0] instanceof ExprNodeConstantDesc) { genericUDF = genericUDF.flip(); + columnDesc = (ExprNodeColumnDesc) extracted[1]; + constantDesc = (ExprNodeConstantDesc) extracted[0]; + } else { + columnDesc = (ExprNodeColumnDesc) extracted[0]; + constantDesc = (ExprNodeConstantDesc) extracted[1]; } String udfName = genericUDF.getUdfName(); @@ -187,22 +203,34 @@ private ExprNodeDesc analyzeExpr( return expr; } - ExprNodeColumnDesc columnDesc = (ExprNodeColumnDesc) extracted[0]; - ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) extracted[1]; if (!allowedColumnNames.contains(columnDesc.getColumn())) { return expr; } + String[] fields = null; + if (extracted.length > 2) { + ExprNodeFieldDesc fieldDesc = (ExprNodeFieldDesc) extracted[2]; + if (!isValidField(fieldDesc)) { + return expr; + } + fields = ExprNodeDescUtils.extractFields(fieldDesc); + } + searchConditions.add( new IndexSearchCondition( columnDesc, udfName, constantDesc, - expr)); + expr, + fields)); // we converted the expression to a search condition, so // remove it from the residual predicate - return null; + return fields == null ? null : expr; + } + + private boolean isValidField(ExprNodeFieldDesc field) { + return fieldValidator == null || fieldValidator.validate(field); } /** @@ -232,4 +260,27 @@ public ExprNodeGenericFuncDesc translateSearchConditions( } return expr; } + + public void setAcceptsFields(boolean acceptsFields) { + this.acceptsFields = acceptsFields; + } + + public static interface FieldValidator { + boolean validate(ExprNodeFieldDesc exprNodeDesc); + } + + public static IndexPredicateAnalyzer createAnalyzer(boolean equalOnly) { + + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); + if (equalOnly) { + return analyzer; + } + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan"); + analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); + + return analyzer; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java index 5f1329c..3a2ecb7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java @@ -33,6 +33,16 @@ private ExprNodeConstantDesc constantDesc; private ExprNodeGenericFuncDesc comparisonExpr; + private String[] fields; + + public IndexSearchCondition( + ExprNodeColumnDesc columnDesc, + String comparisonOp, + ExprNodeConstantDesc constantDesc, + ExprNodeGenericFuncDesc comparisonExpr) { + this(columnDesc, comparisonOp, constantDesc, comparisonExpr, null); + } + /** * Constructs a search condition, which takes the form *
column-ref comparison-op constant-value
. @@ -50,12 +60,14 @@ public IndexSearchCondition( ExprNodeColumnDesc columnDesc, String comparisonOp, ExprNodeConstantDesc constantDesc, - ExprNodeGenericFuncDesc comparisonExpr) { + ExprNodeGenericFuncDesc comparisonExpr, + String[] fields) { this.columnDesc = columnDesc; this.comparisonOp = comparisonOp; this.constantDesc = constantDesc; this.comparisonExpr = comparisonExpr; + this.fields = fields; } public void setColumnDesc(ExprNodeColumnDesc columnDesc) { @@ -90,6 +102,10 @@ public ExprNodeGenericFuncDesc getComparisonExpr() { return comparisonExpr; } + public String[] getFields() { + return fields; + } + @Override public String toString() { return comparisonExpr.getExprString(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 4921966..0a44c80 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -412,6 +413,13 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { return; } + Serializable filterObject = scanDesc.getFilterObject(); + if (filterObject != null) { + jobConf.set( + TableScanDesc.FILTER_OBJECT_CONF_STR, + Utilities.serializeObject(filterObject)); + } + String filterText = filterExpr.getExprString(); String filterExprSerialized = Utilities.serializeExpression(filterExpr); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java index 293b74e..29ded67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java @@ -166,6 +166,11 @@ public ObjectInspector getFieldObjectInspector() { } @Override + public int getFieldID() { + return offset; + } + + @Override public String getFieldComment() { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java index 2a7fdf9..f190f67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java @@ -234,5 +234,10 @@ public int getIndex() { public ObjectInspector getFieldObjectInspector() { return inspector; } + + @Override + public int getFieldID() { + return index; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 9f35575..7d7c764 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.JobConf; +import java.io.Serializable; + /** * HiveStoragePredicateHandler is an optional companion to {@link * HiveStorageHandler}; it should only be implemented by handlers which @@ -69,6 +71,11 @@ public DecomposedPredicate decomposePredicate( public ExprNodeGenericFuncDesc pushedPredicate; /** + * Serialized format for filter + */ + public Serializable pushedPredicateObject; + + /** * Portion of predicate to be post-evaluated by Hive for any rows * which are returned by storage handler. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index e50026b..b6234f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -260,12 +260,46 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur return new ExprNodeDesc[] {expr1, expr2}; } if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeColumnDesc) { - return new ExprNodeDesc[] {expr2, expr1, null}; // add null as a marker (inverted order) + return new ExprNodeDesc[] {expr1, expr2}; + } + if (expr1 instanceof ExprNodeFieldDesc && expr2 instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc columnDesc = extractColumn(expr1); + return columnDesc != null ? new ExprNodeDesc[] {columnDesc, expr2, expr1} : null; + } + if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeFieldDesc) { + ExprNodeColumnDesc columnDesc = extractColumn(expr2); + return columnDesc != null ? new ExprNodeDesc[] {expr1, columnDesc, expr2} : null; } // todo: constant op constant return null; } + public static String[] extractFields(ExprNodeFieldDesc expr) { + return extractFields(expr, new ArrayList()).toArray(new String[0]); + } + + private static List extractFields(ExprNodeDesc expr, List fields) { + if (expr instanceof ExprNodeFieldDesc) { + ExprNodeFieldDesc field = (ExprNodeFieldDesc)expr; + fields.add(field.getFieldName()); + return extractFields(field.getDesc(), fields); + } + if (expr instanceof ExprNodeColumnDesc) { + return fields; + } + throw new IllegalStateException(); + } + + private static ExprNodeColumnDesc extractColumn(ExprNodeDesc expr) { + if (expr instanceof ExprNodeColumnDesc) { + return (ExprNodeColumnDesc)expr; + } + if (expr instanceof ExprNodeFieldDesc) { + return extractColumn(((ExprNodeFieldDesc)expr).getDesc()); + } + return null; + } + // from IndexPredicateAnalyzer private static ExprNodeDesc extractConstant(ExprNodeDesc expr) { if (!(expr instanceof ExprNodeGenericFuncDesc)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ecb82d7..1642d6f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ private int maxStatsKeyPrefixLength = -1; private ExprNodeGenericFuncDesc filterExpr; + private transient Serializable filterObject; public static final String FILTER_EXPR_CONF_STR = "hive.io.filter.expr.serialized"; @@ -68,6 +70,9 @@ public static final String FILTER_TEXT_CONF_STR = "hive.io.filter.text"; + public static final String FILTER_OBJECT_CONF_STR = + "hive.io.filter.object"; + // input file name (big) to bucket number private Map bucketFileNameMapping; @@ -112,6 +117,14 @@ public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) { this.filterExpr = filterExpr; } + public Serializable getFilterObject() { + return filterObject; + } + + public void setFilterObject(Serializable filterObject) { + this.filterObject = filterObject; + } + public void setAlias(String alias) { this.alias = alias; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java index c0a8269..7aaf455 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java @@ -843,6 +843,8 @@ private static ExprNodeGenericFuncDesc pushFilterToStorageHandler( } } tableScanDesc.setFilterExpr(decomposed.pushedPredicate); + tableScanDesc.setFilterObject(decomposed.pushedPredicateObject); + return (ExprNodeGenericFuncDesc)decomposed.residualPredicate; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 5f32f2d..35e30b8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -216,6 +216,11 @@ public String getFieldComment() { } @Override + public int getFieldID() { + return id; + } + + @Override public String toString() { return "field " + id + " " + fieldName; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/BaseStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/BaseStructObjectInspector.java new file mode 100644 index 0000000..2ad93ba --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/BaseStructObjectInspector.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import java.util.ArrayList; +import java.util.List; + +public abstract class BaseStructObjectInspector extends StructObjectInspector { + + protected static class MyField implements StructField { + + protected final int fieldID; + protected final String fieldName; + protected final String fieldComment; + protected final ObjectInspector fieldObjectInspector; + + public MyField(int fieldID, String fieldName, + ObjectInspector fieldObjectInspector, String fieldComment) { + this.fieldID = fieldID; + this.fieldName = fieldName.toLowerCase(); + this.fieldObjectInspector = fieldObjectInspector; + this.fieldComment = fieldComment; + } + + public MyField(int fieldID, StructField field) { + this.fieldID = fieldID; + this.fieldName = field.getFieldName().toLowerCase(); + this.fieldObjectInspector = field.getFieldObjectInspector(); + this.fieldComment = field.getFieldComment(); + } + + public int getFieldID() { + return fieldID; + } + + public String getFieldName() { + return fieldName; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return fieldObjectInspector; + } + + @Override + public String getFieldComment() { + return fieldComment; + } + + @Override + public String toString() { + return fieldID + ":" + fieldName; + } + } + + protected final List fields = new ArrayList(); + + protected BaseStructObjectInspector() { + super(); + } + + /** + * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead. + */ + public BaseStructObjectInspector(List structFieldNames, + List structFieldObjectInspectors) { + init(structFieldNames, structFieldObjectInspectors, null); + } + + public BaseStructObjectInspector(List structFieldNames, + List structFieldObjectInspectors, + List structFieldComments) { + init(structFieldNames, structFieldObjectInspectors, structFieldComments); + } + + protected void init(List structFieldNames, + List structFieldObjectInspectors, + List structFieldComments) { + assert (structFieldNames.size() == structFieldObjectInspectors.size()); + assert (structFieldComments == null || + (structFieldNames.size() == structFieldComments.size())); + + for (int i = 0; i < structFieldNames.size(); i++) { + fields.add(createField(i, + structFieldNames.get(i), structFieldObjectInspectors.get(i), + structFieldComments == null ? null : structFieldComments.get(i))); + } + } + + protected void init(List structFields) { + for (int i = 0; i < structFields.size(); i++) { + fields.add(new MyField(i, structFields.get(i))); + } + } + + // override this for using extended FieldObject + protected MyField createField(int index, String fieldName, ObjectInspector fieldOI, String comment) { + return new MyField(index, fieldName, fieldOI, comment); + } + + @Override + public String getTypeName() { + return ObjectInspectorUtils.getStandardStructTypeName(this); + } + + @Override + public final Category getCategory() { + return Category.STRUCT; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, fields); + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java index dba5e33..83f34ce 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java @@ -46,6 +46,11 @@ public ObjectInspector getFieldObjectInspector() { } @Override + public int getFieldID() { + return -1; + } + + @Override public String getFieldComment() { return ""; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java new file mode 100644 index 0000000..b7efff0 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.util.List; + +public interface StructObject { + + Object getField(int fieldID); + + List getFieldsAsList(); +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java index 1fd6853..fd06f58 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java @@ -23,13 +23,14 @@ import java.util.List; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -public abstract class ColumnarStructBase implements SerDeStatsStruct { +public abstract class ColumnarStructBase implements StructObject, SerDeStatsStruct { class FieldInfo { LazyObjectBase field; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java index 10f4c05..9b5ccbe 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java @@ -25,7 +25,7 @@ * A LazyObject can represent any primitive object or hierarchical object like * array, map or struct. */ -public abstract class LazyObject extends LazyObjectBase { +public abstract class LazyObject implements LazyObjectBase { protected OI oi; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java index 3334dff..7e42b3f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.serde2.lazy; -public abstract class LazyObjectBase { +public interface LazyObjectBase { /** * Set the data for this LazyObjectBase. We take ByteArrayRef instead of byte[] so @@ -33,12 +33,12 @@ * The length of the data, starting from "start" * @see ByteArrayRef */ - public abstract void init(ByteArrayRef bytes, int start, int length); + void init(ByteArrayRef bytes, int start, int length); /** * If the LazyObjectBase is a primitive Object, then deserialize it and return the * actual primitive Object. Otherwise (array, map, struct), return this. */ - public abstract Object getObject(); + Object getObject(); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 82c1263..ac180ca 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -171,6 +171,10 @@ public byte getEscapeChar() { public boolean[] getNeedsEscape() { return needsEscape; } + + public boolean isPrimitive(int index) { + return columnTypes.get(index).getCategory() == Category.PRIMITIVE; + } } SerDeParameters serdeParams = null; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java index 8a1ea46..a01cd66 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java @@ -23,10 +23,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Text; /** @@ -36,8 +37,8 @@ * LazyStruct does not deal with the case of a NULL struct. That is handled by * the parent LazyObject. */ -public class LazyStruct extends LazyNonPrimitive implements - SerDeStatsStruct { +public class LazyStruct extends LazyNonPrimitive + implements StructObject, SerDeStatsStruct { private static Log LOG = LogFactory.getLog(LazyStruct.class.getName()); @@ -62,7 +63,7 @@ /** * The fields of the struct. */ - LazyObject[] fields; + LazyObjectBase[] fields; /** * Whether init() has been called on the field or not. */ @@ -101,17 +102,7 @@ private void parse() { byte escapeChar = oi.getEscapeChar(); if (fields == null) { - List fieldRefs = ((StructObjectInspector) oi) - .getAllStructFieldRefs(); - fields = new LazyObject[fieldRefs.size()]; - for (int i = 0; i < fields.length; i++) { - fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i) - .getFieldObjectInspector()); - } - fieldInited = new boolean[fields.length]; - // Extra element to make sure we have the same formula to compute the - // length of each element of the array. - startPosition = new int[fields.length + 1]; + initLazyFields(oi.getAllStructFieldRefs()); } int structByteEnd = start + length; @@ -172,6 +163,25 @@ private void parse() { parsed = true; } + protected final void initLazyFields(List fieldRefs) { + fields = new LazyObjectBase[fieldRefs.size()]; + for (int i = 0; i < fields.length; i++) { + try { + fields[i] = createLazyField(i, fieldRefs.get(i)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + fieldInited = new boolean[fields.length]; + // Extra element to make sure we have the same formula to compute the + // length of each element of the array. + startPosition = new int[fields.length + 1]; + } + + protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector()); + } + /** * Get one field out of the struct. * @@ -221,14 +231,14 @@ private Object uncheckedGetField(int fieldID) { return fields[fieldID].getObject(); } - ArrayList cachedList; + List cachedList; /** * Get the values of the fields as an ArrayList. * * @return The values of the fields as an ArrayList. */ - public ArrayList getFieldsAsList() { + public List getFieldsAsList() { if (!parsed) { parse(); } @@ -256,7 +266,7 @@ protected void setParsed(boolean parsed) { this.parsed = parsed; } - protected LazyObject[] getFields() { + protected LazyObjectBase[] getFields() { return fields; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java index 8a5386a..ddadfa5 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java @@ -18,16 +18,12 @@ package org.apache.hadoop.hive.serde2.lazy.objectinspector; -import java.util.ArrayList; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.BaseStructObjectInspector; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Text; /** @@ -40,55 +36,8 @@ * Always use the ObjectInspectorFactory to create new ObjectInspector objects, * instead of directly creating an instance of this class. */ -public class LazySimpleStructObjectInspector extends StructObjectInspector { +public class LazySimpleStructObjectInspector extends BaseStructObjectInspector { - public static final Log LOG = LogFactory - .getLog(LazySimpleStructObjectInspector.class.getName()); - - protected static class MyField implements StructField { - protected int fieldID; - protected String fieldName; - protected ObjectInspector fieldObjectInspector; - protected String fieldComment; - - protected MyField() { - super(); - } - public MyField(int fieldID, String fieldName, - ObjectInspector fieldObjectInspector) { - this.fieldID = fieldID; - this.fieldName = fieldName.toLowerCase(); - this.fieldObjectInspector = fieldObjectInspector; - } - - public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector, String fieldComment) { - this(fieldID, fieldName, fieldObjectInspector); - this.fieldComment = fieldComment; - } - - public int getFieldID() { - return fieldID; - } - - public String getFieldName() { - return fieldName; - } - - public ObjectInspector getFieldObjectInspector() { - return fieldObjectInspector; - } - - public String getFieldComment() { - return fieldComment; - } - - @Override - public String toString() { - return "" + fieldID + ":" + fieldName; - } - } - - private List fields; private byte separator; private Text nullSequence; private boolean lastColumnTakesRest; @@ -98,9 +47,18 @@ public String toString() { protected LazySimpleStructObjectInspector() { super(); } + + protected LazySimpleStructObjectInspector( + List fields, byte separator, Text nullSequence) { + init(fields); + this.separator = separator; + this.nullSequence = nullSequence; + } + /** * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead. */ + @Deprecated protected LazySimpleStructObjectInspector(List structFieldNames, List structFieldObjectInspectors, byte separator, Text nullSequence, boolean lastColumnTakesRest, boolean escaped, @@ -122,60 +80,12 @@ protected void init(List structFieldNames, List structFieldComments, byte separator, Text nullSequence, boolean lastColumnTakesRest, boolean escaped, byte escapeChar) { - assert (structFieldNames.size() == structFieldObjectInspectors.size()); - assert (structFieldComments == null || - structFieldNames.size() == structFieldComments.size()); - + init(structFieldNames, structFieldObjectInspectors, structFieldComments); this.separator = separator; this.nullSequence = nullSequence; this.lastColumnTakesRest = lastColumnTakesRest; this.escaped = escaped; this.escapeChar = escapeChar; - - fields = new ArrayList(structFieldNames.size()); - for (int i = 0; i < structFieldNames.size(); i++) { - fields.add(new MyField(i, structFieldNames.get(i), - structFieldObjectInspectors.get(i), - structFieldComments == null ? null : structFieldComments.get(i))); - } - } - - protected LazySimpleStructObjectInspector(List fields, - byte separator, Text nullSequence) { - init(fields, separator, nullSequence); - } - - protected void init(List fields, byte separator, - Text nullSequence) { - this.separator = separator; - this.nullSequence = nullSequence; - - this.fields = new ArrayList(fields.size()); - for (int i = 0; i < fields.size(); i++) { - this.fields.add(new MyField(i, fields.get(i).getFieldName(), fields - .get(i).getFieldObjectInspector(), fields.get(i).getFieldComment())); - } - } - - @Override - public String getTypeName() { - return ObjectInspectorUtils.getStandardStructTypeName(this); - } - - @Override - public final Category getCategory() { - return Category.STRUCT; - } - - // Without Data - @Override - public StructField getStructFieldRef(String fieldName) { - return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, fields); - } - - @Override - public List getAllStructFieldRefs() { - return fields; } // With Data @@ -184,7 +94,7 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - LazyStruct struct = (LazyStruct) data; + StructObject struct = (StructObject) data; MyField f = (MyField) fieldRef; int fieldID = f.getFieldID(); @@ -198,15 +108,8 @@ public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { return null; } - - // Iterate over all the fields picking up the nested structs within them - List result = new ArrayList(fields.size()); - - for (MyField myField : fields) { - result.add(getStructFieldData(data, myField)); - } - - return result; + StructObject struct = (StructObject) data; + return struct.getFieldsAsList(); } // For LazyStruct diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java index 598683f..b3625b3 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java @@ -27,7 +27,7 @@ * A LazyBinaryObject can represent any primitive object or hierarchical object * like string, list, map or struct. */ -public abstract class LazyBinaryObject extends LazyObjectBase { +public abstract class LazyBinaryObject implements LazyObjectBase { OI oi; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java index caf3517..98a35c7 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.RecordInfo; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; @@ -42,8 +43,8 @@ * Following B, there is another section A and B. This pattern repeats until the * all struct fields are serialized. */ -public class LazyBinaryStruct extends - LazyBinaryNonPrimitive implements SerDeStatsStruct { +public class LazyBinaryStruct extends LazyBinaryNonPrimitive + implements StructObject, SerDeStatsStruct { private static Log LOG = LogFactory.getLog(LazyBinaryStruct.class.getName()); diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java index 7d0d91c..e544f95 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.serde2.objectinspector; -import java.util.ArrayList; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.BaseStructObjectInspector; import org.apache.hadoop.hive.serde2.columnar.ColumnarStructBase; /** @@ -35,123 +33,26 @@ * Always use the ObjectInspectorFactory to create new ObjectInspector objects, * instead of directly creating an instance of this class. */ -class ColumnarStructObjectInspector extends StructObjectInspector { - - public static final Log LOG = LogFactory - .getLog(ColumnarStructObjectInspector.class.getName()); - - protected static class MyField implements StructField { - protected int fieldID; - protected String fieldName; - protected ObjectInspector fieldObjectInspector; - protected String fieldComment; - - protected MyField() { - super(); - } - - public MyField(int fieldID, String fieldName, - ObjectInspector fieldObjectInspector) { - this.fieldID = fieldID; - this.fieldName = fieldName.toLowerCase(); - this.fieldObjectInspector = fieldObjectInspector; - } - - public MyField(int fieldID, String fieldName, - ObjectInspector fieldObjectInspector, String fieldComment) { - this(fieldID, fieldName, fieldObjectInspector); - this.fieldComment = fieldComment; - } - - public int getFieldID() { - return fieldID; - } - - public String getFieldName() { - return fieldName; - } - - public ObjectInspector getFieldObjectInspector() { - return fieldObjectInspector; - } - - public String getFieldComment() { - return fieldComment; - } - @Override - public String toString() { - return "" + fieldID + ":" + fieldName; - } - } - - private List fields; +class ColumnarStructObjectInspector extends BaseStructObjectInspector { protected ColumnarStructObjectInspector() { super(); } + /** * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead. */ public ColumnarStructObjectInspector(List structFieldNames, List structFieldObjectInspectors) { - init(structFieldNames, structFieldObjectInspectors, null); + super(structFieldNames, structFieldObjectInspectors); } public ColumnarStructObjectInspector(List structFieldNames, List structFieldObjectInspectors, List structFieldComments) { - init(structFieldNames, structFieldObjectInspectors, structFieldComments); - } - - protected void init(List structFieldNames, - List structFieldObjectInspectors, - List structFieldComments) { - assert (structFieldNames.size() == structFieldObjectInspectors.size()); - assert (structFieldComments == null || - (structFieldNames.size() == structFieldComments.size())); - - fields = new ArrayList(structFieldNames.size()); - for (int i = 0; i < structFieldNames.size(); i++) { - fields.add(new MyField(i, structFieldNames.get(i), - structFieldObjectInspectors.get(i), - structFieldComments == null ? null : structFieldComments.get(i))); - } - } - - protected ColumnarStructObjectInspector(List fields) { - init(fields); - } - - protected void init(List fields) { - this.fields = new ArrayList(fields.size()); - for (int i = 0; i < fields.size(); i++) { - this.fields.add(new MyField(i, fields.get(i).getFieldName(), fields - .get(i).getFieldObjectInspector(), fields.get(i).getFieldComment())); - } - } - - @Override - public String getTypeName() { - return ObjectInspectorUtils.getStandardStructTypeName(this); - } - - @Override - public final Category getCategory() { - return Category.STRUCT; - } - - // Without Data - @Override - public StructField getStructFieldRef(String fieldName) { - return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, fields); - } - - @Override - public List getAllStructFieldRefs() { - return fields; + super(structFieldNames, structFieldObjectInspectors, structFieldComments); } - // With Data @Override public Object getStructFieldData(Object data, StructField fieldRef) { if (data == null) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java index 5e1a369..ef66e97 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java @@ -56,6 +56,9 @@ public String getFieldName() { public ObjectInspector getFieldObjectInspector() { return field.getFieldObjectInspector(); } + public int getFieldID() { + return field.getFieldID(); + } public String getFieldComment() { return field.getFieldComment(); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java index bd3cdd4..ee5b0d0 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java @@ -42,6 +42,7 @@ * */ public static class MyField implements StructField { + protected int fieldID; protected Field field; protected ObjectInspector fieldObjectInspector; @@ -49,7 +50,8 @@ protected MyField() { super(); } - public MyField(Field field, ObjectInspector fieldObjectInspector) { + public MyField(int fieldID, Field field, ObjectInspector fieldObjectInspector) { + this.fieldID = fieldID; this.field = field; this.fieldObjectInspector = fieldObjectInspector; } @@ -62,6 +64,10 @@ public ObjectInspector getFieldObjectInspector() { return fieldObjectInspector; } + public int getFieldID() { + return fieldID; + } + public String getFieldComment() { return null; } @@ -123,7 +129,7 @@ void init(Class objectClass, for (int i = 0; i < reflectionFields.length; i++) { if (!shouldIgnoreField(reflectionFields[i].getName())) { reflectionFields[i].setAccessible(true); - fields.add(new MyField(reflectionFields[i], structFieldObjectInspectors + fields.add(new MyField(i, reflectionFields[i], structFieldObjectInspectors .get(used++))); } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StructField.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StructField.java index 67827d6..c6cfd39 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StructField.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StructField.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hive.serde2.objectinspector; /** - * StructField is an empty interface. - * * Classes implementing this interface are considered to represent a field of a * struct for this serde package. */ @@ -37,6 +35,11 @@ ObjectInspector getFieldObjectInspector(); /** + * Get the fieldID for the field. + */ + int getFieldID(); + + /** * Get the comment for the field. May be null if no comment provided. */ String getFieldComment(); diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/UnionStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/UnionStructObjectInspector.java index 60e55ec..4658cc9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/UnionStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/UnionStructObjectInspector.java @@ -60,6 +60,11 @@ public ObjectInspector getFieldObjectInspector() { return structField.getFieldObjectInspector(); } + @Override + public int getFieldID() { + return structID; + } + public String getFieldComment() { return structField.getFieldComment(); }