diff --git hbase-handler/pom.xml hbase-handler/pom.xml
index 132af43..ad7c52a 100644
--- hbase-handler/pom.xml
+++ hbase-handler/pom.xml
@@ -222,6 +222,19 @@
${basedir}/src/java
${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
new file mode 100644
index 0000000..ef21a9b
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
@@ -0,0 +1,289 @@
+package org.apache.hadoop.hive.hbase;
+
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ColumnMappings implements Iterable {
+
+ private final int keyIndex;
+ private final ColumnMapping[] columnsMapping;
+
+ public ColumnMappings(List columnMapping, int keyIndex) {
+ this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]);
+ this.keyIndex = keyIndex;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return Iterators.forArray(columnsMapping);
+ }
+
+ public int size() {
+ return columnsMapping.length;
+ }
+
+ String toTypesString() {
+ StringBuilder sb = new StringBuilder();
+ for (ColumnMapping colMap : columnsMapping) {
+ if (sb.length() > 0) {
+ sb.append(":");
+ }
+ if (colMap.hbaseRowKey) {
+ // the row key column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ } else if (colMap.qualifierName == null) {
+ // a column family become a MAP
+ sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
+ + serdeConstants.STRING_TYPE_NAME + ">");
+ } else {
+ // an individual column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ }
+ }
+ return sb.toString();
+ }
+
+ void validate(String serdeName, List columnNames, List columnTypes)
+ throws SerDeException {
+ if (columnsMapping.length != columnNames.size()) {
+ throw new SerDeException(serdeName + ": columns has " + columnNames.size() +
+ " elements while hbase.columns.mapping has " + columnsMapping.length + " elements" +
+ " (counting the key if implicit)");
+ }
+
+ // check that the mapping schema is right;
+ // check that the "column-family:" is mapped to Map
+ // where key extends LazyPrimitive, ?> and thus has type Category.PRIMITIVE
+ for (int i = 0; i < columnNames.size(); i++) {
+ ColumnMapping colMap = columnsMapping[i];
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ TypeInfo typeInfo = columnTypes.get(i);
+ if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) ||
+ (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+ != ObjectInspector.Category.PRIMITIVE)) {
+
+ throw new SerDeException(
+ serdeName + ": hbase column family '" + colMap.familyName
+ + "' should be mapped to Map extends LazyPrimitive, ?>,?>, that is "
+ + "the Key for the map should be of primitive type, but is mapped to "
+ + typeInfo.getTypeName());
+ }
+ }
+ colMap.columnName = columnNames.get(i);
+ colMap.columnType = columnTypes.get(i);
+ }
+ }
+
+ /**
+ * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+ * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+ * data types like boolean, byte, short, int, long, float, and double. This applies to
+ * regular columns and also to map column types which are associated with an HBase column
+ * family. For the map types, we apply the specification to the key or the value provided it
+ * is one of the above primitive types. The specifier is a colon separated value of the form
+ * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+ * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+ * table level default.
+ *
+ * @param hbaseTableDefaultStorageType - the specification associated with the table property
+ * hbase.table.default.storage.type
+ * @throws SerDeException on parse error.
+ */
+ void parseColumnStorageTypes(String hbaseTableDefaultStorageType) throws SerDeException {
+
+ boolean tableBinaryStorage = false;
+
+ if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+ if (hbaseTableDefaultStorageType.equals("binary")) {
+ tableBinaryStorage = true;
+ } else if (!hbaseTableDefaultStorageType.equals("string")) {
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+ " parameter must be specified as" +
+ " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+ "' is not a valid specification for this table/serde property.");
+ }
+ }
+
+ // parse the string to determine column level storage type for primitive types
+ // 's' is for variable length string format storage
+ // 'b' is for fixed width binary storage of bytes
+ // '-' is for table storage type, which defaults to UTF8 string
+ // string data is always stored in the default escaped storage format; the data types
+ // byte, short, int, long, float, and double have a binary byte oriented storage option
+ for (ColumnMapping colMap : columnsMapping) {
+ TypeInfo colType = colMap.columnType;
+ String mappingSpec = colMap.mappingSpec;
+ String[] mapInfo = mappingSpec.split("#");
+ String[] storageInfo = null;
+
+ if (mapInfo.length == 2) {
+ storageInfo = mapInfo[1].split(":");
+ }
+
+ if (storageInfo == null) {
+
+ // use the table default storage specification
+ if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else if (colType.getCategory() == ObjectInspector.Category.MAP) {
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 1) {
+ // we have a storage specification for a primitive column type
+ String storageOption = storageInfo[0];
+
+ if ((colType.getCategory() == ObjectInspector.Category.MAP) ||
+ !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+ "binary".startsWith(storageOption))) {
+ throw new SerDeException("Error: A column storage specification is one of the following:"
+ + " '-', a prefix of 'string', or a prefix of 'binary'. "
+ + storageOption + " is not a valid storage option specification for "
+ + colMap.columnName);
+ }
+
+ if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if ("-".equals(storageOption)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(storageOption)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 2) {
+ // we have a storage specification for a map column type
+
+ String keyStorage = storageInfo[0];
+ String valStorage = storageInfo[1];
+
+ if ((colType.getCategory() != ObjectInspector.Category.MAP) ||
+ !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+ "binary".startsWith(keyStorage)) ||
+ !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+ "binary".startsWith(valStorage))) {
+ throw new SerDeException("Error: To specify a valid column storage type for a Map"
+ + " column, use any two specifiers from '-', a prefix of 'string', "
+ + " and a prefix of 'binary' separated by a ':'."
+ + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+ + " key and value parts of the Map,?> respectively."
+ + " Invalid storage specification for column "
+ + colMap.columnName
+ + "; " + storageInfo[0] + ":" + storageInfo[1]);
+ }
+
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if (keyStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(keyStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ if (valStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(valStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (colMap.binaryStorage.size() != 2) {
+ throw new SerDeException("Error: In parsing the storage specification for column "
+ + colMap.columnName);
+ }
+
+ } else {
+ // error in storage specification
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification "
+ + mappingSpec + " is not valid for column: "
+ + colMap.columnName);
+ }
+ }
+ }
+
+ public ColumnMapping getKeyMapping() {
+ return columnsMapping[keyIndex];
+ }
+
+ public int getKeyIndex() {
+ return keyIndex;
+ }
+
+ public ColumnMapping[] getColumnsMapping() {
+ return columnsMapping;
+ }
+
+ // todo use final fields
+ static class ColumnMapping {
+
+ ColumnMapping() {
+ binaryStorage = new ArrayList(2);
+ }
+
+ String columnName;
+ TypeInfo columnType;
+
+ String familyName;
+ String qualifierName;
+ byte[] familyNameBytes;
+ byte[] qualifierNameBytes;
+ List binaryStorage;
+ boolean hbaseRowKey;
+ String mappingSpec;
+ String qualifierPrefix;
+ byte[] qualifierPrefixBytes;
+
+ public boolean isCategory(ObjectInspector.Category category) {
+ return columnType.getCategory() == category;
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseAbstractKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseAbstractKeyFactory.java
new file mode 100644
index 0000000..3cc50e9
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseAbstractKeyFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public abstract class HBaseAbstractKeyFactory implements HBaseKeyFactory {
+
+ protected HBaseSerDeParameters hbaseParams;
+ protected ColumnMappings.ColumnMapping keyMapping;
+ protected Properties properties;
+
+ @Override
+ public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException {
+ this.hbaseParams = hbaseParam;
+ this.keyMapping = hbaseParam.getKeyColumnMapping();
+ this.properties = properties;
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException {
+ TableMapReduceUtil.addDependencyJars(jobConf, getClass());
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) {
+ return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
index 5008f15..5654f50 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
@@ -18,11 +18,8 @@
package org.apache.hadoop.hive.hbase;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
@@ -31,6 +28,9 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class
* and override the {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a
@@ -52,9 +52,11 @@
*
*
*
- * */
+ */
public class HBaseCompositeKey extends LazyStruct {
+ protected static final Log LOG = LogFactory.getLog(HBaseCompositeKey.class);
+
public HBaseCompositeKey(LazySimpleStructObjectInspector oi) {
super(oi);
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKeyFactory.java
new file mode 100644
index 0000000..b15e6c6
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKeyFactory.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class HBaseCompositeKeyFactory
+ extends HBaseDefaultKeyFactory implements Configurable {
+
+ public static final Log LOG = LogFactory.getLog(HBaseCompositeKeyFactory.class);
+
+ private final Class keyClass;
+ private final Constructor constructor;
+
+ private Configuration conf;
+
+ public HBaseCompositeKeyFactory(Class keyClass) throws Exception {
+ // see javadoc of HBaseCompositeKey
+ this.keyClass = keyClass;
+ this.constructor = keyClass.getDeclaredConstructor(
+ LazySimpleStructObjectInspector.class, Properties.class, Configuration.class);
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException {
+ super.configureJobConf(tableDesc, jobConf);
+ TableMapReduceUtil.addDependencyJars(jobConf, keyClass);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public T createKey(ObjectInspector inspector) throws SerDeException {
+ try {
+ return (T) constructor.newInstance(inspector, properties, conf);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
+ ExprNodeDesc predicate) {
+ String keyColName = hbaseParams.getKeyColumnMapping().columnName;
+
+ IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true);
+ analyzer.allowColumnName(keyColName);
+ analyzer.setAcceptsFields(true);
+ analyzer.setFieldValidator(new Validator());
+
+ DecomposedPredicate decomposed = new DecomposedPredicate();
+
+ List conditions = new ArrayList();
+ decomposed.residualPredicate =
+ (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions);
+ if (!conditions.isEmpty()) {
+ decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions);
+ try {
+ decomposed.pushedPredicateObject = setupFilter(keyColName, conditions);
+ } catch (Exception e) {
+ LOG.warn("Failed to decompose predicates", e);
+ return null;
+ }
+ }
+ return decomposed;
+ }
+
+ private Serializable setupFilter(String keyColName, List conditions)
+ throws Exception {
+ HBaseScanRange scanRange = new HBaseScanRange();
+ for (IndexSearchCondition condition : conditions) {
+ if (condition.getFields() == null) {
+ continue;
+ }
+ String field = condition.getFields()[0];
+ Object value = condition.getConstantDesc().getValue();
+ scanRange.addFilter(new FamilyFilter(
+ CompareFilter.CompareOp.EQUAL, new BinaryComparator(field.getBytes())));
+ }
+ return scanRange;
+ }
+
+ private static class Validator implements IndexPredicateAnalyzer.FieldValidator {
+
+ public boolean validate(ExprNodeFieldDesc fieldDesc) {
+ String fieldName = fieldDesc.getFieldName();
+
+ ExprNodeDesc nodeDesc = fieldDesc.getDesc();
+
+ TypeInfo typeInfo = nodeDesc.getTypeInfo();
+
+ if (!(typeInfo instanceof StructTypeInfo)) {
+ // since we are working off a ExprNodeFieldDesc which represents a field within a struct, this
+ // should never happen
+ throw new AssertionError("Expected StructTypeInfo. Found:" + typeInfo.getTypeName());
+ }
+
+ List allFieldNames = ((StructTypeInfo) typeInfo).getAllStructFieldNames();
+
+ if (allFieldNames == null || allFieldNames.size() == 0) {
+ return false;
+ }
+
+ String firstElement = allFieldNames.get(0);
+
+ return firstElement.equals(fieldName);
+ }
+ }
+}
\ No newline at end of file
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDefaultKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDefaultKeyFactory.java
new file mode 100644
index 0000000..0564df2
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDefaultKeyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HBaseDefaultKeyFactory extends HBaseAbstractKeyFactory implements HBaseKeyFactory {
+
+ protected LazySimpleSerDe.SerDeParameters serdeParams;
+ protected HBaseRowSerializer serializer;
+
+ @Override
+ public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException {
+ super.init(hbaseParam, properties);
+ this.serdeParams = hbaseParam.getSerdeParams();
+ this.serializer = new HBaseRowSerializer(hbaseParam);
+ }
+
+ @Override
+ public ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException {
+ return LazyFactory.createLazyObjectInspector(type, serdeParams.getSeparators(), 1,
+ serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar());
+ }
+
+ @Override
+ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException {
+ return LazyFactory.createLazyObject(inspector, keyMapping.binaryStorage.get(0));
+ }
+
+ @Override
+ public byte[] serializeKey(Object object, StructField field) throws IOException {
+ return serializer.serializeKeyField(object, field, keyMapping);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
new file mode 100644
index 0000000..251d22e
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Provides custom implementation of object and object inspector for hbase key.
+ * The key object should implement LazyObjectBase.
+ *
+ * User can optionally implement HiveStoragePredicateHandler for handling filter predicates
+ */
+public interface HBaseKeyFactory extends HiveStoragePredicateHandler {
+
+ /**
+ * initialize factory with properties
+ */
+ void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException;
+
+ /**
+ * create custom object inspector for hbase key
+ * @param type type information
+ */
+ ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException;
+
+ /**
+ * create custom object for hbase key
+ * @param inspector OI create by {@link HBaseKeyFactory#createKeyObjectInspector}
+ */
+ LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException;
+
+ /**
+ * serialize hive object in internal format of custom key
+ *
+ * @param object
+ * @param inspector
+ * @param output
+ *
+ * @return true if it's not null
+ * @throws java.io.IOException
+ */
+ byte[] serializeKey(Object object, StructField field) throws IOException;
+
+ /**
+ * configure jobConf for this factory
+ *
+ * @param tableDesc
+ * @param jobConf
+ */
+ void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException;
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
new file mode 100644
index 0000000..5c26456
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// Does same thing with LazyFactory#createLazyObjectInspector except that this replaces
+// original keyOI with OI which is create by HBaseKeyFactory provided by serde property for hbase
+public class HBaseLazyObjectFactory {
+
+ public static ObjectInspector createLazyHBaseStructInspector(
+ SerDeParameters serdeParams, int index, HBaseKeyFactory factory) throws SerDeException {
+ List columnTypes = serdeParams.getColumnTypes();
+ ArrayList columnObjectInspectors = new ArrayList(
+ columnTypes.size());
+ for (int i = 0; i < columnTypes.size(); i++) {
+ if (i == index) {
+ columnObjectInspectors.add(factory.createKeyObjectInspector(columnTypes.get(i)));
+ } else {
+ columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(
+ columnTypes.get(i), serdeParams.getSeparators(), 1, serdeParams.getNullSequence(),
+ serdeParams.isEscaped(), serdeParams.getEscapeChar()));
+ }
+ }
+ return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+ serdeParams.getColumnNames(), columnObjectInspectors, serdeParams.getSeparators()[0],
+ serdeParams.getNullSequence(), serdeParams.isLastColumnTakesRest(),
+ serdeParams.isEscaped(), serdeParams.getEscapeChar());
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
new file mode 100644
index 0000000..0c37dfd
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
@@ -0,0 +1,268 @@
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class HBaseRowSerializer {
+
+ private final HBaseKeyFactory keyFactory;
+ private final HBaseSerDeParameters hbaseParam;
+ private final LazySimpleSerDe.SerDeParameters serdeParam;
+
+ private final int keyIndex;
+ private final ColumnMapping keyMapping;
+ private final ColumnMapping[] columnMappings;
+ private final byte[] separators; // the separators array
+ private final boolean escaped; // whether we need to escape the data when writing out
+ private final byte escapeChar; // which char to use as the escape char, e.g. '\\'
+ private final boolean[] needsEscape; // which chars need to be escaped. This array should have size
+ // of 128. Negative byte values (or byte values >= 128) are
+ // never escaped.
+
+ private final long putTimestamp;
+ private final ByteStream.Output output = new ByteStream.Output();
+
+ public HBaseRowSerializer(HBaseSerDeParameters hbaseParam) {
+ this.hbaseParam = hbaseParam;
+ this.keyFactory = hbaseParam.getKeyFactory();
+ this.serdeParam = hbaseParam.getSerdeParams();
+ this.separators = serdeParam.getSeparators();
+ this.escaped = serdeParam.isEscaped();
+ this.escapeChar = serdeParam.getEscapeChar();
+ this.needsEscape = serdeParam.getNeedsEscape();
+ this.keyIndex = hbaseParam.getKeyIndex();
+ this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping();
+ this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping();
+ this.putTimestamp = hbaseParam.getPutTimestamp();
+ }
+
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws Exception {
+ if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ List list = soi.getStructFieldsDataAsList(obj);
+
+ StructField field = fields.get(keyIndex);
+ Object value = list.get(keyIndex);
+
+ byte[] key = keyFactory.serializeKey(value, field);
+ if (key == null) {
+ throw new SerDeException("HBase row key cannot be NULL");
+ }
+
+ Put put = putTimestamp >= 0 ? new Put(key, putTimestamp) : new Put(key);
+
+ // Serialize each field
+ for (int i = 0; i < fields.size(); i++) {
+ if (i == keyIndex) {
+ continue;
+ }
+ field = fields.get(i);
+ value = list.get(i);
+ serializeField(value, field, columnMappings[i], put);
+ }
+
+ return new PutWritable(put);
+ }
+
+ byte[] serializeKeyField(Object keyValue, StructField keyField, ColumnMapping keyMapping)
+ throws IOException {
+ if (keyValue == null) {
+ throw new IOException("HBase row key cannot be NULL");
+ }
+ ObjectInspector keyFieldOI = keyField.getFieldObjectInspector();
+
+ if (!keyFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE) &&
+ keyMapping.isCategory(ObjectInspector.Category.PRIMITIVE)) {
+ // we always serialize the String type using the escaped algorithm for LazyString
+ return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false);
+ }
+ // use the serialization option switch to write primitive values as either a variable
+ // length UTF8 string or a fixed width bytes if serializing in binary format
+ boolean writeBinary = keyMapping.binaryStorage.get(0);
+ return serialize(keyValue, keyFieldOI, 1, writeBinary);
+ }
+
+ private void serializeField(
+ Object value, StructField field, ColumnMapping colMap, Put put) throws IOException {
+ if (value == null) {
+ // a null object, we do not serialize it
+ return;
+ }
+ // Get the field objectInspector and the field object.
+ ObjectInspector foi = field.getFieldObjectInspector();
+
+ // If the field corresponds to a column family in HBase
+ if (colMap.qualifierName == null) {
+ MapObjectInspector moi = (MapObjectInspector) foi;
+ Map, ?> map = moi.getMap(value);
+ if (map == null) {
+ return;
+ }
+ ObjectInspector koi = moi.getMapKeyObjectInspector();
+ ObjectInspector voi = moi.getMapValueObjectInspector();
+
+ for (Map.Entry, ?> entry: map.entrySet()) {
+ // Get the Key
+ // Map keys are required to be primitive and may be serialized in binary format
+ byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0));
+ if (columnQualifierBytes == null) {
+ continue;
+ }
+
+ // Map values may be serialized in binary format when they are primitive and binary
+ // serialization is the option selected
+ byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1));
+ if (bytes == null) {
+ continue;
+ }
+
+ put.add(colMap.familyNameBytes, columnQualifierBytes, bytes);
+ }
+ } else {
+ byte[] bytes;
+ // If the field that is passed in is NOT a primitive, and either the
+ // field is not declared (no schema was given at initialization), or
+ // the field is declared as a primitive in initialization, serialize
+ // the data to JSON string. Otherwise serialize the data in the
+ // delimited way.
+ if (!foi.getCategory().equals(ObjectInspector.Category.PRIMITIVE)
+ && colMap.isCategory(ObjectInspector.Category.PRIMITIVE)) {
+ // we always serialize the String type using the escaped algorithm for LazyString
+ bytes = serialize(SerDeUtils.getJSONString(value, foi),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false);
+ } else {
+ // use the serialization option switch to write primitive values as either a variable
+ // length UTF8 string or a fixed width bytes if serializing in binary format
+ bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0));
+ }
+
+ if (bytes == null) {
+ return;
+ }
+
+ put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes);
+ }
+ }
+
+ /*
+ * Serialize the row into a ByteStream.
+ *
+ * @param obj The object for the current field.
+ * @param objInspector The ObjectInspector for the current Object.
+ * @param level The current level of separator.
+ * @param writeBinary Whether to write a primitive object as an UTF8 variable length string or
+ * as a fixed width byte array onto the byte stream.
+ * @throws IOException On error in writing to the serialization stream.
+ * @return true On serializing a non-null object, otherwise false.
+ */
+ private byte[] serialize(Object obj, ObjectInspector objInspector, int level, boolean writeBinary)
+ throws IOException {
+ output.reset();
+ if (objInspector.getCategory() == ObjectInspector.Category.PRIMITIVE && writeBinary) {
+ LazyUtils.writePrimitive(output, obj, (PrimitiveObjectInspector) objInspector);
+ } else {
+ if (!serialize(obj, objInspector, level, output)) {
+ return null;
+ }
+ }
+ return output.toByteArray();
+ }
+
+ private boolean serialize(
+ Object obj,
+ ObjectInspector objInspector,
+ int level, ByteStream.Output ss) throws IOException {
+
+ switch (objInspector.getCategory()) {
+ case PRIMITIVE:
+ LazyUtils.writePrimitiveUTF8(ss, obj,
+ (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
+ return true;
+ case LIST:
+ char separator = (char) separators[level];
+ ListObjectInspector loi = (ListObjectInspector)objInspector;
+ List> list = loi.getList(obj);
+ ObjectInspector eoi = loi.getListElementObjectInspector();
+ if (list == null) {
+ return false;
+ } else {
+ for (int i = 0; i < list.size(); i++) {
+ if (i > 0) {
+ ss.write(separator);
+ }
+ serialize(list.get(i), eoi, level + 1, ss);
+ }
+ }
+ return true;
+ case MAP:
+ char sep = (char) separators[level];
+ char keyValueSeparator = (char) separators[level+1];
+ MapObjectInspector moi = (MapObjectInspector) objInspector;
+ ObjectInspector koi = moi.getMapKeyObjectInspector();
+ ObjectInspector voi = moi.getMapValueObjectInspector();
+
+ Map, ?> map = moi.getMap(obj);
+ if (map == null) {
+ return false;
+ } else {
+ boolean first = true;
+ for (Map.Entry, ?> entry: map.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ ss.write(sep);
+ }
+ serialize(entry.getKey(), koi, level+2, ss);
+ ss.write(keyValueSeparator);
+ serialize(entry.getValue(), voi, level+2, ss);
+ }
+ }
+ return true;
+ case STRUCT:
+ sep = (char)separators[level];
+ StructObjectInspector soi = (StructObjectInspector)objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ list = soi.getStructFieldsDataAsList(obj);
+ if (list == null) {
+ return false;
+ } else {
+ for (int i = 0; i < list.size(); i++) {
+ if (i > 0) {
+ ss.write(sep);
+ }
+
+ serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
+ level + 1, ss);
+ }
+ }
+ return true;
+ default:
+ throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java
new file mode 100644
index 0000000..8b64321
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HBaseScanRange implements Serializable {
+
+ private byte[] startRow;
+ private byte[] stopRow;
+
+ private List filterDescs = new ArrayList();
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public void setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ public void setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ }
+
+ public void addFilter(Filter filter) throws Exception {
+ Class extends Filter> clazz = filter.getClass();
+ clazz.getMethod("parseFrom", byte[].class); // valiade
+ filterDescs.add(new FilterDesc(clazz.getName(), filter.toByteArray()));
+ }
+
+ public void setup(Scan scan, Configuration conf) throws Exception {
+ if (startRow != null) {
+ scan.setStartRow(startRow);
+ }
+ if (stopRow != null) {
+ scan.setStopRow(stopRow);
+ }
+ if (filterDescs.isEmpty()) {
+ return;
+ }
+ if (filterDescs.size() == 1) {
+ scan.setFilter(filterDescs.get(0).toFilter(conf));
+ return;
+ }
+ List filters = new ArrayList();
+ for (FilterDesc filter : filterDescs) {
+ filters.add(filter.toFilter(conf));
+ }
+ scan.setFilter(new FilterList(filters));
+ }
+
+ public String toString() {
+ return (startRow == null ? "" : new BytesWritable(startRow).toString()) + " ~ " +
+ (stopRow == null ? "" : new BytesWritable(stopRow).toString());
+ }
+
+ private static class FilterDesc implements Serializable {
+
+ private String className;
+ private byte[] binary;
+
+ public FilterDesc(String className, byte[] binary) {
+ this.className = className;
+ this.binary = binary;
+ }
+
+ public Filter toFilter(Configuration conf) throws Exception {
+ return (Filter) getFactoryMethod(className, conf).invoke(null, binary);
+ }
+
+ private Method getFactoryMethod(String className, Configuration conf) throws Exception {
+ Class> clazz = conf.getClassByName(className);
+ return clazz.getMethod("parseFrom", byte[].class);
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
index 5fe35a5..6c1ce5c 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
@@ -18,36 +18,24 @@
package org.apache.hadoop.hive.hbase;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
/**
* HBaseSerDe can be used to serialize object into an HBase table and
@@ -62,6 +50,7 @@
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
+ public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
@@ -73,19 +62,13 @@
private ObjectInspector cachedObjectInspector;
private LazyHBaseRow cachedHBaseRow;
- private Object compositeKeyObj;
private HBaseSerDeParameters serdeParams;
+ private HBaseRowSerializer serializer;
@Override
public String toString() {
- return getClass().toString()
- + "["
- + serdeParams.getColumnMappingString()
- + ":"
- + serdeParams.getRowTypeInfo().getAllStructFieldNames()
- + ":"
- + serdeParams.getRowTypeInfo().getAllStructFieldTypeInfos() + "]";
+ return getClass() + "[" + serdeParams + "]";
}
public HBaseSerDe() throws SerDeException {
@@ -98,35 +81,26 @@ public HBaseSerDe() throws SerDeException {
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
- serdeParams = new HBaseSerDeParameters();
- serdeParams.init(conf, tbl, getClass().getName());
-
- cachedObjectInspector = LazyFactory.createLazyStructInspector(
- serdeParams.getColumnNames(),
- serdeParams.getColumnTypes(),
- serdeParams.getSeparators(),
- serdeParams.getNullSequence(),
- serdeParams.isLastColumnTakesRest(),
- serdeParams.isEscaped(),
- serdeParams.getEscapeChar());
-
- cachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector) cachedObjectInspector);
-
- if (serdeParams.getCompositeKeyClass() != null) {
- // initialize the constructor of the composite key class with its object inspector
- initCompositeKeyClass(conf,tbl);
- }
+ serdeParams = new HBaseSerDeParameters(conf, tbl, getClass().getName());
+
+ cachedObjectInspector = HBaseLazyObjectFactory.createLazyHBaseStructInspector(
+ serdeParams.getSerdeParams(), serdeParams.getKeyIndex(), serdeParams.getKeyFactory());
+
+ cachedHBaseRow = new LazyHBaseRow(
+ (LazySimpleStructObjectInspector) cachedObjectInspector,
+ serdeParams.getKeyIndex(), serdeParams.getKeyFactory());
+
+ serializer = new HBaseRowSerializer(serdeParams);
if (LOG.isDebugEnabled()) {
- LOG.debug("HBaseSerDe initialized with : columnNames = "
- + serdeParams.getColumnNames()
- + " columnTypes = "
- + serdeParams.getColumnTypes()
- + " hbaseColumnMapping = "
- + serdeParams.getColumnMappingString());
+ LOG.debug("HBaseSerDe initialized with : " + serdeParams);
}
}
+ public static ColumnMappings parseColumnsMapping(String columnsMappingSpec)
+ throws SerDeException {
+ return parseColumnsMapping(columnsMappingSpec, true);
+ }
/**
* Parses the HBase columns mapping specifier to identify the column families, qualifiers
* and also caches the byte arrays corresponding to them. One of the Hive table
@@ -135,24 +109,23 @@ public void initialize(Configuration conf, Properties tbl)
* @param columnsMappingSpec string hbase.columns.mapping specified when creating table
* @param doColumnRegexMatching whether to do a regex matching on the columns or not
* @return List which contains the column mapping information by position
- * @throws SerDeException
+ * @throws org.apache.hadoop.hive.serde2.SerDeException
*/
- public static List parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching)
- throws SerDeException {
+ public static ColumnMappings parseColumnsMapping(
+ String columnsMappingSpec, boolean doColumnRegexMatching) throws SerDeException {
if (columnsMappingSpec == null) {
throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
}
- if (columnsMappingSpec.equals("") || columnsMappingSpec.equals(HBASE_KEY_COL)) {
+ if (columnsMappingSpec.isEmpty() || columnsMappingSpec.equals(HBASE_KEY_COL)) {
throw new SerDeException("Error: hbase.columns.mapping specifies only the HBase table"
+ " row key. A valid Hive-HBase table must specify at least one additional column.");
}
int rowKeyIndex = -1;
List columnsMapping = new ArrayList();
- String [] columnSpecs = columnsMappingSpec.split(",");
- ColumnMapping columnMapping = null;
+ String[] columnSpecs = columnsMappingSpec.split(",");
for (int i = 0; i < columnSpecs.length; i++) {
String mappingSpec = columnSpecs[i].trim();
@@ -167,7 +140,7 @@ public void initialize(Configuration conf, Properties tbl)
"column family, column qualifier specification.");
}
- columnMapping = new ColumnMapping();
+ ColumnMapping columnMapping = new ColumnMapping();
if (colInfo.equals(HBASE_KEY_COL)) {
rowKeyIndex = i;
@@ -197,7 +170,6 @@ public void initialize(Configuration conf, Properties tbl)
// set the regular provided qualifier names
columnMapping.qualifierName = parts[1];
columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
- ;
}
} else {
columnMapping.qualifierName = null;
@@ -211,34 +183,26 @@ public void initialize(Configuration conf, Properties tbl)
}
if (rowKeyIndex == -1) {
- columnMapping = new ColumnMapping();
- columnMapping.familyName = HBASE_KEY_COL;
- columnMapping.familyNameBytes = Bytes.toBytes(HBASE_KEY_COL);
+ rowKeyIndex = 0;
+ ColumnMapping columnMapping = new ColumnMapping();
+ columnMapping.familyName = HBaseSerDe.HBASE_KEY_COL;
+ columnMapping.familyNameBytes = Bytes.toBytes(HBaseSerDe.HBASE_KEY_COL);
columnMapping.qualifierName = null;
columnMapping.qualifierNameBytes = null;
columnMapping.hbaseRowKey = true;
- columnMapping.mappingSpec = HBASE_KEY_COL;
+ columnMapping.mappingSpec = HBaseSerDe.HBASE_KEY_COL;
columnsMapping.add(0, columnMapping);
}
- return columnsMapping;
+ return new ColumnMappings(columnsMapping, rowKeyIndex);
}
- static class ColumnMapping {
-
- ColumnMapping() {
- binaryStorage = new ArrayList(2);
- }
+ public LazySimpleSerDe.SerDeParameters getSerdeParams() {
+ return serdeParams.getSerdeParams();
+ }
- String familyName;
- String qualifierName;
- byte [] familyNameBytes;
- byte [] qualifierNameBytes;
- List binaryStorage;
- boolean hbaseRowKey;
- String mappingSpec;
- String qualifierPrefix;
- byte[] qualifierPrefixBytes;
+ public HBaseSerDeParameters getHBaseSerdeParam() {
+ return serdeParams;
}
/**
@@ -253,8 +217,8 @@ public Object deserialize(Writable result) throws SerDeException {
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}
- cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMapping(),
- compositeKeyObj);
+ cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMappings());
+
return cachedHBaseRow;
}
@@ -269,310 +233,14 @@ public ObjectInspector getObjectInspector() throws SerDeException {
}
@Override
- public Writable serialize(Object obj, ObjectInspector objInspector)
- throws SerDeException {
- if (objInspector.getCategory() != Category.STRUCT) {
- throw new SerDeException(getClass().toString()
- + " can only serialize struct types, but we got: "
- + objInspector.getTypeName());
- }
-
- // Prepare the field ObjectInspectors
- StructObjectInspector soi = (StructObjectInspector) objInspector;
- List extends StructField> fields = soi.getAllStructFieldRefs();
- List list = soi.getStructFieldsDataAsList(obj);
- List extends StructField> declaredFields =
- ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs();
-
- int iKey = serdeParams.getKeyIndex();
- StructField field = fields.get(iKey);
- Object value = list.get(iKey);
- StructField declaredField = declaredFields.get(iKey);
- byte[] key;
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
try {
- key = serializeKeyField(field, value, declaredField, serdeParams);
- } catch (IOException ex) {
- throw new SerDeException(ex);
- }
-
- if (key == null) {
- throw new SerDeException("HBase row key cannot be NULL");
- }
-
- Put put = null;
- long putTimestamp = serdeParams.getPutTimestamp();
- if(putTimestamp >= 0) {
- put = new Put(key, putTimestamp);
- } else {
- put = new Put(key);
- }
-
- try {
- // Serialize each field
- for (int i = 0; i < fields.size(); i++) {
- if (i == iKey) {
- continue;
- }
-
- field = fields.get(i);
- value = list.get(i);
- declaredField = declaredFields.get(i);
- ColumnMapping colMap = serdeParams.getColumnMapping().get(i);
- serializeField(put, field, value, declaredField, colMap);
- }
- } catch (IOException e) {
+ return serializer.serialize(obj, objInspector);
+ } catch (SerDeException e) {
+ throw e;
+ } catch (Exception e) {
throw new SerDeException(e);
}
-
- return new PutWritable(put);
- }
-
- private static byte[] serializeKeyField(StructField keyField, Object keyValue,
- StructField declaredKeyField, HBaseSerDeParameters serdeParams) throws IOException {
- if (keyValue == null) {
- // a null object, we do not serialize it
- return null;
- }
-
- boolean writeBinary = serdeParams.getKeyColumnMapping().binaryStorage.get(0);
- ObjectInspector keyFieldOI = keyField.getFieldObjectInspector();
-
- if (!keyFieldOI.getCategory().equals(Category.PRIMITIVE) &&
- declaredKeyField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
- // we always serialize the String type using the escaped algorithm for LazyString
- return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI),
- PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
- } else {
- // use the serialization option switch to write primitive values as either a variable
- // length UTF8 string or a fixed width bytes if serializing in binary format
- return serialize(keyValue, keyFieldOI, 1, writeBinary, serdeParams);
- }
-
- }
-
- private void serializeField(Put put, StructField field, Object value,
- StructField declaredField, ColumnMapping colMap) throws IOException {
- if (value == null) {
- // a null object, we do not serialize it
- return;
- }
-
- // Get the field objectInspector and the field object.
- ObjectInspector foi = field.getFieldObjectInspector();
-
- // If the field corresponds to a column family in HBase
- if (colMap.qualifierName == null) {
- MapObjectInspector moi = (MapObjectInspector) foi;
- ObjectInspector koi = moi.getMapKeyObjectInspector();
- ObjectInspector voi = moi.getMapValueObjectInspector();
-
- Map, ?> map = moi.getMap(value);
- if (map == null) {
- return;
- } else {
- for (Map.Entry, ?> entry: map.entrySet()) {
- // Get the Key
- // Map keys are required to be primitive and may be serialized in binary format
- byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0), serdeParams);
- if (columnQualifierBytes == null) {
- continue;
- }
-
- // Map values may be serialized in binary format when they are primitive and binary
- // serialization is the option selected
- byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1), serdeParams);
- if (bytes == null) {
- continue;
- }
-
- put.add(colMap.familyNameBytes, columnQualifierBytes, bytes);
- }
- }
- } else {
- byte[] bytes = null;
- // If the field that is passed in is NOT a primitive, and either the
- // field is not declared (no schema was given at initialization), or
- // the field is declared as a primitive in initialization, serialize
- // the data to JSON string. Otherwise serialize the data in the
- // delimited way.
- if (!foi.getCategory().equals(Category.PRIMITIVE)
- && declaredField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
- // we always serialize the String type using the escaped algorithm for LazyString
- bytes = serialize(SerDeUtils.getJSONString(value, foi),
- PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
- } else {
- // use the serialization option switch to write primitive values as either a variable
- // length UTF8 string or a fixed width bytes if serializing in binary format
- bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0), serdeParams);
- }
-
- if (bytes == null) {
- return;
- }
-
- put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes);
- }
- }
-
- private static byte[] getBytesFromStream(ByteStream.Output ss) {
- byte [] buf = new byte[ss.getCount()];
- System.arraycopy(ss.getData(), 0, buf, 0, ss.getCount());
- return buf;
- }
-
- /*
- * Serialize the row into a ByteStream.
- *
- * @param obj The object for the current field.
- * @param objInspector The ObjectInspector for the current Object.
- * @param level The current level of separator.
- * @param writeBinary Whether to write a primitive object as an UTF8 variable length string or
- * as a fixed width byte array onto the byte stream.
- * @throws IOException On error in writing to the serialization stream.
- * @return true On serializing a non-null object, otherwise false.
- */
- private static byte[] serialize(Object obj, ObjectInspector objInspector, int level,
- boolean writeBinary, HBaseSerDeParameters serdeParams) throws IOException {
- ByteStream.Output ss = new ByteStream.Output();
- if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) {
- LazyUtils.writePrimitive(ss, obj, (PrimitiveObjectInspector) objInspector);
- } else {
- if (false == serialize(obj, objInspector, level, serdeParams, ss)) {
- return null;
- }
- }
-
- return getBytesFromStream(ss);
- }
-
- private static boolean serialize(
- Object obj,
- ObjectInspector objInspector,
- int level, HBaseSerDeParameters serdeParams, ByteStream.Output ss) throws IOException {
-
- byte[] separators = serdeParams.getSeparators();
- boolean escaped = serdeParams.isEscaped();
- byte escapeChar = serdeParams.getEscapeChar();
- boolean[] needsEscape = serdeParams.getNeedsEscape();
- switch (objInspector.getCategory()) {
- case PRIMITIVE:
- LazyUtils.writePrimitiveUTF8(ss, obj,
- (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
- return true;
- case LIST:
- char separator = (char) separators[level];
- ListObjectInspector loi = (ListObjectInspector)objInspector;
- List> list = loi.getList(obj);
- ObjectInspector eoi = loi.getListElementObjectInspector();
- if (list == null) {
- return false;
- } else {
- for (int i = 0; i < list.size(); i++) {
- if (i > 0) {
- ss.write(separator);
- }
- serialize(list.get(i), eoi, level + 1, serdeParams, ss);
- }
- }
- return true;
- case MAP:
- char sep = (char) separators[level];
- char keyValueSeparator = (char) separators[level+1];
- MapObjectInspector moi = (MapObjectInspector) objInspector;
- ObjectInspector koi = moi.getMapKeyObjectInspector();
- ObjectInspector voi = moi.getMapValueObjectInspector();
-
- Map, ?> map = moi.getMap(obj);
- if (map == null) {
- return false;
- } else {
- boolean first = true;
- for (Map.Entry, ?> entry: map.entrySet()) {
- if (first) {
- first = false;
- } else {
- ss.write(sep);
- }
- serialize(entry.getKey(), koi, level+2, serdeParams, ss);
- ss.write(keyValueSeparator);
- serialize(entry.getValue(), voi, level+2, serdeParams, ss);
- }
- }
- return true;
- case STRUCT:
- sep = (char)separators[level];
- StructObjectInspector soi = (StructObjectInspector)objInspector;
- List extends StructField> fields = soi.getAllStructFieldRefs();
- list = soi.getStructFieldsDataAsList(obj);
- if (list == null) {
- return false;
- } else {
- for (int i = 0; i < list.size(); i++) {
- if (i > 0) {
- ss.write(sep);
- }
-
- serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
- level + 1, serdeParams, ss);
- }
- }
- return true;
- default:
- throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
- }
- }
-
- /**
- * Initialize the composite key class with the objectinspector for the key
- *
- * @throws SerDeException
- * */
- private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException {
-
- int i = 0;
-
- // find the hbase row key
- for (ColumnMapping colMap : serdeParams.getColumnMapping()) {
- if (colMap.hbaseRowKey) {
- break;
- }
- i++;
- }
-
- ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector)
- .getAllStructFieldRefs().get(i).getFieldObjectInspector();
-
- try {
- compositeKeyObj = serdeParams.getCompositeKeyClass().getDeclaredConstructor(
- LazySimpleStructObjectInspector.class, Properties.class, Configuration.class)
- .newInstance(
- ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf);
- } catch (IllegalArgumentException e) {
- throw new SerDeException(e);
- } catch (SecurityException e) {
- throw new SerDeException(e);
- } catch (InstantiationException e) {
- throw new SerDeException(e);
- } catch (IllegalAccessException e) {
- throw new SerDeException(e);
- } catch (InvocationTargetException e) {
- throw new SerDeException(e);
- } catch (NoSuchMethodException e) {
- // the constructor wasn't defined in the implementation class. Flag error
- throw new SerDeException("Constructor not defined in composite key class ["
- + serdeParams.getCompositeKeyClass().getName() + "]", e);
- }
- }
-
- /**
- * @return 0-based offset of the key column within the table
- */
- int getKeyColumnOffset() {
- return serdeParams.getKeyIndex();
- }
-
- List getStorageFormatOfCol(int colPos){
- return serdeParams.getColumnMapping().get(colPos).binaryStorage;
}
@Override
@@ -581,18 +249,13 @@ public SerDeStats getSerDeStats() {
return null;
}
- public static int getRowKeyColumnOffset(List columnsMapping)
- throws SerDeException {
-
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
-
- if (colMap.hbaseRowKey && colMap.familyName.equals(HBASE_KEY_COL)) {
- return i;
- }
- }
+ public HBaseKeyFactory getKeyFactory() {
+ return serdeParams.getKeyFactory();
+ }
- throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
- " row key column.");
+ public static void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws Exception {
+ HBaseSerDeParameters serdeParams =
+ new HBaseSerDeParameters(jobConf, tableDesc.getProperties(), HBaseSerDe.class.getName());
+ serdeParams.getKeyFactory().configureJobConf(tableDesc, jobConf);
}
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
index b64590d..1366f2f 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
@@ -18,20 +18,17 @@
package org.apache.hadoop.hive.hbase;
-import java.util.List;
-import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.util.List;
+import java.util.Properties;
/**
* HBaseSerDeParameters encapsulates SerDeParameters and additional configurations that are specific for
@@ -39,318 +36,72 @@
*
*/
public class HBaseSerDeParameters {
- private SerDeParameters serdeParams;
- private String columnMappingString;
- private List columnMapping;
- private boolean doColumnRegexMatching;
+ private final String serdeName;
+ private final SerDeParameters serdeParams;
- private long putTimestamp;
+ private final String columnMappingString;
+ private final ColumnMappings columnMappings;
+ private final boolean doColumnRegexMatching;
- private Class> compositeKeyClass;
- private int keyIndex;
+ private final long putTimestamp;
+ private final HBaseKeyFactory keyFactory;
- void init(Configuration job, Properties tbl, String serdeName) throws SerDeException {
- serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
- putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1"));
-
- String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
- if (compKeyClass != null) {
- try {
- compositeKeyClass = job.getClassByName(compKeyClass);
- } catch (ClassNotFoundException e) {
- throw new SerDeException(e);
- }
- }
+ HBaseSerDeParameters(Configuration job, Properties tbl, String serdeName) throws SerDeException {
+ this.serdeName = serdeName;
+ this.serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+ this.putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1"));
// Read configuration parameters
columnMappingString = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true"));
// Parse and initialize the HBase columns mapping
- columnMapping = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching);
+ columnMappings = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching);
+ columnMappings.validate(serdeName, serdeParams.getColumnNames(), serdeParams.getColumnTypes());
+
+ // Precondition: make sure this is done after the rest of the SerDe initialization is done.
+ String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+ columnMappings.parseColumnStorageTypes(hbaseTableStorageType);
// Build the type property string if not supplied
String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
if (columnTypeProperty == null) {
- StringBuilder sb = new StringBuilder();
-
- for (int i = 0; i < columnMapping.size(); i++) {
- if (sb.length() > 0) {
- sb.append(":");
- }
-
- ColumnMapping colMap = columnMapping.get(i);
-
- if (colMap.hbaseRowKey) {
- // the row key column becomes a STRING
- sb.append(serdeConstants.STRING_TYPE_NAME);
- } else if (colMap.qualifierName == null) {
- // a column family become a MAP
- sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
- + serdeConstants.STRING_TYPE_NAME + ">");
- } else {
- // an individual column becomes a STRING
- sb.append(serdeConstants.STRING_TYPE_NAME);
- }
- }
- tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString());
- }
-
- if (columnMapping.size() != serdeParams.getColumnNames().size()) {
- throw new SerDeException(serdeName + ": columns has " +
- serdeParams.getColumnNames().size() +
- " elements while hbase.columns.mapping has " +
- columnMapping.size() + " elements" +
- " (counting the key if implicit)");
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, columnMappings.toTypesString());
}
- // check that the mapping schema is right;
- // check that the "column-family:" is mapped to Map
- // where key extends LazyPrimitive, ?> and thus has type Category.PRIMITIVE
- for (int i = 0; i < columnMapping.size(); i++) {
- ColumnMapping colMap = columnMapping.get(i);
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
- TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
- if ((typeInfo.getCategory() != Category.MAP) ||
- (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
- != Category.PRIMITIVE)) {
-
- throw new SerDeException(
- serdeName + ": hbase column family '" + colMap.familyName
- + "' should be mapped to Map extends LazyPrimitive, ?>,?>, that is "
- + "the Key for the map should be of primitive type, but is mapped to "
- + typeInfo.getTypeName());
- }
- }
- }
-
- // Precondition: make sure this is done after the rest of the SerDe initialization is done.
- String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
- parseColumnStorageTypes(hbaseTableStorageType);
- setKeyColumnOffset();
+ this.keyFactory = initKeyFactory(job, tbl);
}
- /*
- * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
- * whether to use a binary or an UTF string format to serialize and de-serialize primitive
- * data types like boolean, byte, short, int, long, float, and double. This applies to
- * regular columns and also to map column types which are associated with an HBase column
- * family. For the map types, we apply the specification to the key or the value provided it
- * is one of the above primitive types. The specifier is a colon separated value of the form
- * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
- * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
- * table level default.
- *
- * @param hbaseTableDefaultStorageType - the specification associated with the table property
- * hbase.table.default.storage.type
- * @throws SerDeException on parse error.
- */
-
- public void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
- throws SerDeException {
-
- boolean tableBinaryStorage = false;
-
- if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
- if (hbaseTableDefaultStorageType.equals("binary")) {
- tableBinaryStorage = true;
- } else if (!hbaseTableDefaultStorageType.equals("string")) {
- throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
- " parameter must be specified as" +
- " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
- "' is not a valid specification for this table/serde property.");
- }
- }
-
- // parse the string to determine column level storage type for primitive types
- // 's' is for variable length string format storage
- // 'b' is for fixed width binary storage of bytes
- // '-' is for table storage type, which defaults to UTF8 string
- // string data is always stored in the default escaped storage format; the data types
- // byte, short, int, long, float, and double have a binary byte oriented storage option
- List columnTypes = serdeParams.getColumnTypes();
-
- for (int i = 0; i < columnMapping.size(); i++) {
-
- ColumnMapping colMap = columnMapping.get(i);
- TypeInfo colType = columnTypes.get(i);
- String mappingSpec = colMap.mappingSpec;
- String [] mapInfo = mappingSpec.split("#");
- String [] storageInfo = null;
-
- if (mapInfo.length == 2) {
- storageInfo = mapInfo[1].split(":");
- }
-
- if (storageInfo == null) {
-
- // use the table default storage specification
- if (colType.getCategory() == Category.PRIMITIVE) {
- if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else if (colType.getCategory() == Category.MAP) {
- TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
- TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
- if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
- !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
- !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- } else if (storageInfo.length == 1) {
- // we have a storage specification for a primitive column type
- String storageOption = storageInfo[0];
-
- if ((colType.getCategory() == Category.MAP) ||
- !(storageOption.equals("-") || "string".startsWith(storageOption) ||
- "binary".startsWith(storageOption))) {
- throw new SerDeException("Error: A column storage specification is one of the following:"
- + " '-', a prefix of 'string', or a prefix of 'binary'. "
- + storageOption + " is not a valid storage option specification for "
- + serdeParams.getColumnNames().get(i));
- }
-
- if (colType.getCategory() == Category.PRIMITIVE &&
- !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
- if ("-".equals(storageOption)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(storageOption)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- } else if (storageInfo.length == 2) {
- // we have a storage specification for a map column type
-
- String keyStorage = storageInfo[0];
- String valStorage = storageInfo[1];
-
- if ((colType.getCategory() != Category.MAP) ||
- !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
- "binary".startsWith(keyStorage)) ||
- !(valStorage.equals("-") || "string".startsWith(valStorage) ||
- "binary".startsWith(valStorage))) {
- throw new SerDeException("Error: To specify a valid column storage type for a Map"
- + " column, use any two specifiers from '-', a prefix of 'string', "
- + " and a prefix of 'binary' separated by a ':'."
- + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
- + " key and value parts of the Map,?> respectively."
- + " Invalid storage specification for column "
- + serdeParams.getColumnNames().get(i)
- + "; " + storageInfo[0] + ":" + storageInfo[1]);
- }
-
- TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
- TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
- if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
- !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
- if (keyStorage.equals("-")) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(keyStorage)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
- !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- if (valStorage.equals("-")) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(valStorage)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (colMap.binaryStorage.size() != 2) {
- throw new SerDeException("Error: In parsing the storage specification for column "
- + serdeParams.getColumnNames().get(i));
- }
-
- } else {
- // error in storage specification
- throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification "
- + mappingSpec + " is not valid for column: "
- + serdeParams.getColumnNames().get(i));
+ private HBaseKeyFactory initKeyFactory(Configuration conf, Properties tbl) throws SerDeException {
+ try {
+ HBaseKeyFactory keyFactory = createKeyFactory(conf, tbl);
+ if (keyFactory != null) {
+ keyFactory.init(this, tbl);
}
+ return keyFactory;
+ } catch (Exception e) {
+ throw new SerDeException(e);
}
}
- void setKeyColumnOffset() throws SerDeException {
- setKeyIndex(getRowKeyColumnOffset(columnMapping));
- }
-
- public static int getRowKeyColumnOffset(List columnsMapping)
- throws SerDeException {
-
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
-
- if (colMap.hbaseRowKey && colMap.familyName.equals(HBaseSerDe.HBASE_KEY_COL)) {
- return i;
- }
+ private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tbl) throws Exception {
+ String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY);
+ if (factoryClassName != null) {
+ Class> factoryClazz = Class.forName(factoryClassName);
+ return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job);
}
-
- throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
- " row key column.");
- }
-
- public StructTypeInfo getRowTypeInfo() {
- return (StructTypeInfo) serdeParams.getRowTypeInfo();
+ String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+ if (keyClassName != null) {
+ Class> keyClass = Class.forName(keyClassName);
+ return new HBaseCompositeKeyFactory(keyClass);
+ }
+ return new HBaseDefaultKeyFactory();
}
public List getColumnNames() {
return serdeParams.getColumnNames();
}
- public byte[] getSeparators() {
- return serdeParams.getSeparators();
- }
-
- public Text getNullSequence() {
- return serdeParams.getNullSequence();
- }
-
- public boolean isLastColumnTakesRest() {
- return serdeParams.isLastColumnTakesRest();
- }
-
- public boolean isEscaped() {
- return serdeParams.isEscaped();
- }
-
- public byte getEscapeChar() {
- return serdeParams.getEscapeChar();
- }
-
public List getColumnTypes() {
return serdeParams.getColumnTypes();
}
@@ -359,59 +110,38 @@ public SerDeParameters getSerdeParams() {
return serdeParams;
}
- public void setSerdeParams(SerDeParameters serdeParams) {
- this.serdeParams = serdeParams;
- }
-
- public String getColumnMappingString() {
- return columnMappingString;
- }
-
- public void setColumnMappingString(String columnMappingString) {
- this.columnMappingString = columnMappingString;
- }
-
public long getPutTimestamp() {
return putTimestamp;
}
- public void setPutTimestamp(long putTimestamp) {
- this.putTimestamp = putTimestamp;
- }
-
- public boolean isDoColumnRegexMatching() {
- return doColumnRegexMatching;
- }
-
- public void setDoColumnRegexMatching(boolean doColumnRegexMatching) {
- this.doColumnRegexMatching = doColumnRegexMatching;
- }
-
- public Class> getCompositeKeyClass() {
- return compositeKeyClass;
- }
-
- public void setCompositeKeyClass(Class> compositeKeyClass) {
- this.compositeKeyClass = compositeKeyClass;
+ public int getKeyIndex() {
+ return columnMappings.getKeyIndex();
}
- public int getKeyIndex() {
- return keyIndex;
+ public ColumnMapping getKeyColumnMapping() {
+ return columnMappings.getKeyMapping();
}
- public void setKeyIndex(int keyIndex) {
- this.keyIndex = keyIndex;
+ public ColumnMappings getColumnMappings() {
+ return columnMappings;
}
- public List getColumnMapping() {
- return columnMapping;
+ public HBaseKeyFactory getKeyFactory() {
+ return keyFactory;
}
- public ColumnMapping getKeyColumnMapping() {
- return columnMapping.get(keyIndex);
+ public TypeInfo getTypeForName(String columnName) {
+ List columnNames = serdeParams.getColumnNames();
+ List columnTypes = serdeParams.getColumnTypes();
+ for (int i = 0; i < columnNames.size(); i++) {
+ if (columnName.equals(columnNames.get(i))) {
+ return columnTypes.get(i);
+ }
}
+ throw new IllegalArgumentException("Invalid column name " + columnName);
+ }
- public boolean[] getNeedsEscape() {
- return serdeParams.getNeedsEscape();
+ public String toString() {
+ return "[" + columnMappingString + ":" + getColumnNames() + ":" + getColumnTypes() + "]";
}
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
index 4fe1b1b..255ffa2 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
@@ -40,7 +40,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -155,7 +155,7 @@ public void preCreateTable(Table tbl) throws MetaException {
Map serdeParam = tbl.getSd().getSerdeInfo().getParameters();
String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- List columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, true);
+ ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
HTableDescriptor tableDesc;
@@ -166,7 +166,7 @@ public void preCreateTable(Table tbl) throws MetaException {
tableDesc = new HTableDescriptor(tableName);
Set uniqueColumnFamilies = new HashSet();
- for (ColumnMapping colMap : columnsMapping) {
+ for (ColumnMapping colMap : columnMappings) {
if (!colMap.hbaseRowKey) {
uniqueColumnFamilies.add(colMap.familyName);
}
@@ -192,8 +192,7 @@ public void preCreateTable(Table tbl) throws MetaException {
// make sure the schema mapping is right
tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName));
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
+ for (ColumnMapping colMap : columnMappings) {
if (colMap.hbaseRowKey) {
continue;
@@ -378,6 +377,7 @@ private void addHBaseDelegationToken(Configuration conf) throws IOException {
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
try {
+ HBaseSerDe.configureJobConf(tableDesc, jobConf);
/*
* HIVE-6356
* The following code change is only needed for hbase-0.96.0 due to HBASE-9165, and
@@ -392,7 +392,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
TableMapReduceUtil.addDependencyJars(copy);
merged.addAll(copy.getConfiguration().getStringCollection("tmpjars"));
jobConf.set("tmpjars", StringUtils.arrayToString(merged.toArray(new String[0])));
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -403,22 +403,26 @@ public DecomposedPredicate decomposePredicate(
Deserializer deserializer,
ExprNodeDesc predicate)
{
+ HBaseKeyFactory keyFactory = ((HBaseSerDe) deserializer).getKeyFactory();
+ return keyFactory.decomposePredicate(jobConf, deserializer, predicate);
+ }
+
+ public static DecomposedPredicate decomposePredicate(
+ JobConf jobConf,
+ HBaseSerDe hBaseSerDe,
+ ExprNodeDesc predicate) {
String columnNameProperty = jobConf.get(
- org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS);
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS);
List columnNames =
- Arrays.asList(columnNameProperty.split(","));
-
- HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
- int keyColPos = hbaseSerde.getKeyColumnOffset();
- String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES).
- split(",")[keyColPos];
- IndexPredicateAnalyzer analyzer =
- HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(columnNames.get(keyColPos), keyColType,
- hbaseSerde.getStorageFormatOfCol(keyColPos).get(0));
+ Arrays.asList(columnNameProperty.split(","));
+
+ ColumnMapping keyMapping = hBaseSerDe.getHBaseSerdeParam().getKeyColumnMapping();
+ IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(
+ keyMapping.columnName, keyMapping.columnType, keyMapping.binaryStorage.get(0));
List searchConditions =
- new ArrayList();
+ new ArrayList();
ExprNodeGenericFuncDesc residualPredicate =
- (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions);
+ (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions);
int scSize = searchConditions.size();
if (scSize < 1 || 2 < scSize) {
// Either there was nothing which could be pushed down (size = 0),
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 142bfd8..2dbe334 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -35,7 +35,7 @@
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
@@ -54,6 +54,7 @@
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -93,15 +94,15 @@
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
- List columnsMapping = null;
+ ColumnMappings columnMappings;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
- if (columnsMapping.size() < readColIDs.size()) {
+ if (columnMappings.size() < readColIDs.size()) {
throw new IOException("Cannot read more columns than the given table contains.");
}
@@ -113,8 +114,9 @@
List addedFamilies = new ArrayList();
if (!readAllColumns) {
+ ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
for (int i : readColIDs) {
- ColumnMapping colMap = columnsMapping.get(i);
+ ColumnMapping colMap = columnsMapping[i];
if (colMap.hbaseRowKey) {
continue;
}
@@ -139,8 +141,7 @@
// to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
// tables column projection.
if (empty) {
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
+ for (ColumnMapping colMap: columnMappings) {
if (colMap.hbaseRowKey) {
continue;
}
@@ -256,6 +257,18 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
// TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
Scan scan = new Scan();
+ String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
+ if (filterObjectSerialized != null) {
+ HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized,
+ HBaseScanRange.class);
+ try {
+ range.setup(scan, jobConf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return scan;
+ }
+
String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return scan;
@@ -324,6 +337,10 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
}
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow));
+ }
return scan;
}
@@ -374,6 +391,12 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
System.arraycopy(current, 0, next, 0, current.length);
return next;
}
+
+ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
+ String keyColumnName, TypeInfo keyColType, boolean isKeyBinary) {
+ return newIndexPredicateAnalyzer(keyColumnName, keyColType.getTypeName(), isKeyBinary);
+ }
+
/**
* Instantiates a new predicate analyzer suitable for
* determining how to push a filter down into the HBase scan,
@@ -423,20 +446,15 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
throw new IOException("hbase.columns.mapping required for HBase Table.");
}
- List columnsMapping = null;
+ ColumnMappings columnMappings = null;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
- int iKey;
-
- try {
- iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping);
- } catch (SerDeException e) {
- throw new IOException(e);
- }
+ int iKey = columnMappings.getKeyIndex();
+ ColumnMapping keyMapping = columnMappings.getKeyMapping();
// Take filter pushdown into account while calculating splits; this
// allows us to prune off regions immediately. Note that although
@@ -445,7 +463,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
// definition into account and excludes regions which don't satisfy
// the start/stop row conditions (HBASE-1829).
Scan scan = createFilterScan(jobConf, iKey,
- getStorageFormatOfKey(columnsMapping.get(iKey).mappingSpec,
+ getStorageFormatOfKey(keyMapping.mappingSpec,
jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
@@ -454,8 +472,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
- for (int i = 0; i columnsMapping;
- private Object compositeKeyObj;
+ private ColumnMapping[] columnsMapping;
private ArrayList cachedList;
- /**
- * Construct a LazyHBaseRow object with the ObjectInspector.
- */
+ private final int iKey;
+ private final HBaseKeyFactory keyFactory;
+
public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
- super(oi);
+ this(oi, -1, null);
}
/**
- * Set the HBase row data(a Result writable) for this LazyStruct.
- * @see LazyHBaseRow#init(Result)
+ * Construct a LazyHBaseRow object with the ObjectInspector.
*/
- public void init(Result r, List columnsMapping) {
- init(r, columnsMapping, null);
+ public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory) {
+ super(oi);
+ this.iKey = iKey;
+ this.keyFactory = keyFactory;
}
/**
* Set the HBase row data(a Result writable) for this LazyStruct.
- *
- * @see LazyHBaseRow#init(Result)
- *
- * @param compositeKeyClass
- * custom implementation to interpret the composite key
+ * @see LazyHBaseRow#init(org.apache.hadoop.hbase.client.Result)
*/
- public void init(Result r, List columnsMapping, Object compositeKeyObj) {
-
- result = r;
- this.columnsMapping = columnsMapping;
- this.compositeKeyObj = compositeKeyObj;
+ public void init(Result r, ColumnMappings columnsMappings) {
+ this.result = r;
+ this.columnsMapping = columnsMappings.getColumnsMapping();
setParsed(false);
}
- /**
- * Parse the Result and fill each field.
- * @see LazyStruct#parse()
- */
- private void parse() {
-
- if (getFields() == null) {
- List extends StructField> fieldRefs =
- ((StructObjectInspector)getInspector()).getAllStructFieldRefs();
- LazyObject extends ObjectInspector> [] fields = new LazyObject>[fieldRefs.size()];
-
- for (int i = 0; i < fields.length; i++) {
- ColumnMapping colMap = columnsMapping.get(i);
-
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
- // a column family
- fields[i] = new LazyHBaseCellMap(
- (LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector());
- continue;
- }
-
- fields[i] = LazyFactory.createLazyObject(
- fieldRefs.get(i).getFieldObjectInspector(),
- colMap.binaryStorage.get(0));
- }
-
- setFields(fields);
- setFieldInited(new boolean[fields.length]);
+ @Override
+ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException {
+ if (fieldID == iKey) {
+ return keyFactory.createKey(fieldRef.getFieldObjectInspector());
}
-
- Arrays.fill(getFieldInited(), false);
- setParsed(true);
+ ColumnMapping colMap = columnsMapping[fieldID];
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ // a column family
+ return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector());
+ }
+ return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(),
+ colMap.binaryStorage.get(0));
}
/**
@@ -127,16 +99,17 @@ private void parse() {
*/
@Override
public Object getField(int fieldID) {
- if (!getParsed()) {
- parse();
- }
-
- Object value = uncheckedGetField(fieldID);
+ initFields();
+ return uncheckedGetField(fieldID);
+ }
- if (columnsMapping.get(fieldID).hbaseRowKey && compositeKeyObj != null) {
- return compositeKeyObj;
- } else {
- return value;
+ private void initFields() {
+ if (getFields() == null) {
+ initLazyFields(oi.getAllStructFieldRefs());
+ }
+ if (!getParsed()) {
+ Arrays.fill(getFieldInited(), false);
+ setParsed(true);
}
}
@@ -149,12 +122,12 @@ public Object getField(int fieldID) {
*/
private Object uncheckedGetField(int fieldID) {
- LazyObject> [] fields = getFields();
+ LazyObjectBase[] fields = getFields();
boolean [] fieldsInited = getFieldInited();
if (!fieldsInited[fieldID]) {
ByteArrayRef ref = null;
- ColumnMapping colMap = columnsMapping.get(fieldID);
+ ColumnMapping colMap = columnsMapping[fieldID];
if (colMap.hbaseRowKey) {
ref = new ByteArrayRef();
@@ -182,12 +155,6 @@ private Object uncheckedGetField(int fieldID) {
if (ref != null) {
fields[fieldID].init(ref, 0, ref.getData().length);
-
- // if it was a row key and we have been provided a custom composite key class, initialize it
- // with the bytes for the row key
- if (colMap.hbaseRowKey && compositeKeyObj != null) {
- ((LazyStruct) compositeKeyObj).init(ref, 0, ref.getData().length);
- }
}
}
@@ -203,9 +170,7 @@ private Object uncheckedGetField(int fieldID) {
*/
@Override
public ArrayList getFieldsAsList() {
- if (!getParsed()) {
- parse();
- }
+ initFields();
if (cachedList == null) {
cachedList = new ArrayList();
} else {
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
index 13c344b..7da360f 100644
--- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.hive.hbase;
-import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import java.util.Properties;
+
public class HBaseTestCompositeKey extends HBaseCompositeKey {
byte[] bytes;
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java
new file mode 100644
index 0000000..889261f
--- /dev/null
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.StructObjectBaseInspector;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestHBaseKeyFactory extends HBaseDefaultKeyFactory {
+
+ private static final String DELIMITER_PATTERN = "\\$\\$";
+ private static final byte[] DELIMITER_BINARY = "$$".getBytes();
+
+ @Override
+ public ObjectInspector createKeyObjectInspector(TypeInfo type) {
+ return new SlashSeparatedOI((StructTypeInfo)type);
+ }
+
+ @Override
+ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException {
+ return new DoubleDollarSeparated();
+ }
+
+ private final ByteStream.Output output = new ByteStream.Output();
+
+ @Override
+ public byte[] serializeKey(Object object, StructField field) throws IOException {
+ ObjectInspector inspector = field.getFieldObjectInspector();
+ if (inspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new IllegalStateException("invalid type value " + inspector.getTypeName());
+ }
+ output.reset();
+ for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) {
+ if (output.getCount() > 0) {
+ output.write(DELIMITER_BINARY);
+ }
+ output.write(String.valueOf(element).getBytes());
+ }
+ return output.getCount() > 0 ? output.toByteArray() : null;
+ }
+
+ private static class DoubleDollarSeparated implements LazyObjectBase {
+
+ private Object[] fields;
+
+ @Override
+ public void init(ByteArrayRef bytes, int start, int length) {
+ fields = new String(bytes.getData(), start, length).split(DELIMITER_PATTERN);
+ }
+
+ @Override
+ public Object getObject() {
+ return this;
+ }
+ }
+
+ private static class SlashSeparatedOI extends StructObjectBaseInspector {
+
+ private int length;
+
+ private SlashSeparatedOI(StructTypeInfo type) {
+ List names = type.getAllStructFieldNames();
+ List ois = new ArrayList();
+ for (int i = 0; i < names.size(); i++) {
+ ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ }
+ init(names, ois, null);
+ }
+
+ @Override
+ public Object getStructFieldData(Object data, StructField fieldRef) {
+ return ((DoubleDollarSeparated)data).fields[((MyField)fieldRef).getFieldID()];
+ }
+
+ @Override
+ public List getStructFieldsDataAsList(Object data) {
+ return Arrays.asList(((DoubleDollarSeparated)data).fields);
+ }
+ }
+}
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java
new file mode 100644
index 0000000..2d3490a
--- /dev/null
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.StructObjectBaseInspector;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestHBaseKeyFactory2 extends HBaseAbstractKeyFactory {
+
+ private static final int FIXED_LENGTH = 10;
+
+ @Override
+ public ObjectInspector createKeyObjectInspector(TypeInfo type) {
+ return new StringArrayOI((StructTypeInfo)type);
+ }
+
+ @Override
+ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException {
+ return new FixedLengthed(FIXED_LENGTH);
+ }
+
+ private final ByteStream.Output output = new ByteStream.Output();
+
+ @Override
+ public byte[] serializeKey(Object object, StructField field) throws IOException {
+ ObjectInspector inspector = field.getFieldObjectInspector();
+ if (inspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new IllegalStateException("invalid type value " + inspector.getTypeName());
+ }
+ output.reset();
+ for (Object element : ((StructObjectInspector)inspector).getStructFieldsDataAsList(object)) {
+ output.write(toBinary(String.valueOf(element).getBytes(), FIXED_LENGTH, false, false));
+ }
+ return output.getCount() > 0 ? output.toByteArray() : null;
+ }
+
+ private byte[] toBinary(String value, int max, boolean end, boolean nextBA) {
+ return toBinary(value.getBytes(), max, end, nextBA);
+ }
+
+ private byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) {
+ byte[] bytes = new byte[max + 1];
+ System.arraycopy(value, 0, bytes, 0, Math.min(value.length, max));
+ if (end) {
+ Arrays.fill(bytes, value.length, max, (byte)0xff);
+ }
+ if (nextBA) {
+ bytes[max] = 0x01;
+ }
+ return bytes;
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
+ ExprNodeDesc predicate) {
+ String keyColName = keyMapping.columnName;
+
+ IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(false);
+ analyzer.allowColumnName(keyColName);
+ analyzer.setAcceptsFields(true);
+
+ DecomposedPredicate decomposed = new DecomposedPredicate();
+
+ List searchConditions = new ArrayList();
+ decomposed.residualPredicate =
+ (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions);
+ if (!searchConditions.isEmpty()) {
+ decomposed.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
+ try {
+ decomposed.pushedPredicateObject = setupFilter(keyColName, searchConditions);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return decomposed;
+ }
+
+ private HBaseScanRange setupFilter(String keyColName, List conditions)
+ throws IOException {
+ Map> fieldConds =
+ new HashMap>();
+ for (IndexSearchCondition condition : conditions) {
+ assert keyColName.equals(condition.getColumnDesc().getColumn());
+ String fieldName = condition.getFields()[0];
+ List fieldCond = fieldConds.get(fieldName);
+ if (fieldCond == null) {
+ fieldConds.put(fieldName, fieldCond = new ArrayList());
+ }
+ fieldCond.add(condition);
+ }
+ HBaseScanRange range = new HBaseScanRange();
+
+ ByteArrayOutputStream startRow = new ByteArrayOutputStream();
+ ByteArrayOutputStream stopRow = new ByteArrayOutputStream();
+
+ StructTypeInfo type = (StructTypeInfo) keyMapping.columnType;
+ for (String name : type.getAllStructFieldNames()) {
+ List fieldCond = fieldConds.get(name);
+ if (fieldCond == null || fieldCond.size() > 2) {
+ continue;
+ }
+ byte[] startElement = null;
+ byte[] stopElement = null;
+ for (IndexSearchCondition condition : fieldCond) {
+ if (condition.getConstantDesc().getValue() == null) {
+ continue;
+ }
+ String comparisonOp = condition.getComparisonOp();
+ String constantVal = String.valueOf(condition.getConstantDesc().getValue());
+
+ if (comparisonOp.endsWith("UDFOPEqual")) {
+ startElement = toBinary(constantVal, FIXED_LENGTH, false, false);
+ stopElement = toBinary(constantVal, FIXED_LENGTH, true, true);
+ } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) {
+ startElement = toBinary(constantVal, FIXED_LENGTH, false, false);
+ } else if (comparisonOp.endsWith("UDFOPGreaterThan")) {
+ startElement = toBinary(constantVal, FIXED_LENGTH, false, true);
+ } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) {
+ stopElement = toBinary(constantVal, FIXED_LENGTH, true, false);
+ } else if (comparisonOp.endsWith("UDFOPLessThan")) {
+ stopElement = toBinary(constantVal, FIXED_LENGTH, true, true);
+ } else {
+ throw new IOException(comparisonOp + " is not a supported comparison operator");
+ }
+ }
+ if (startRow != null) {
+ if (startElement != null) {
+ startRow.write(startElement);
+ } else {
+ if (startRow.size() > 0) {
+ range.setStartRow(startRow.toByteArray());
+ }
+ startRow = null;
+ }
+ }
+ if (stopRow != null) {
+ if (stopElement != null) {
+ stopRow.write(stopElement);
+ } else {
+ if (stopRow.size() > 0) {
+ range.setStopRow(stopRow.toByteArray());
+ }
+ stopRow = null;
+ }
+ }
+ if (startElement == null && stopElement == null) {
+ break;
+ }
+ }
+ if (startRow != null && startRow.size() > 0) {
+ range.setStartRow(startRow.toByteArray());
+ }
+ if (stopRow != null && stopRow.size() > 0) {
+ range.setStopRow(stopRow.toByteArray());
+ }
+ return range;
+ }
+
+ private static class FixedLengthed implements LazyObjectBase {
+
+ private final int fixedLength;
+ private final List fields = new ArrayList();
+
+ public FixedLengthed(int fixedLength) {
+ this.fixedLength = fixedLength;
+ }
+
+ @Override
+ public void init(ByteArrayRef bytes, int start, int length) {
+ fields.clear();
+ byte[] data = bytes.getData();
+ int rowStart = start;
+ int rowStop = rowStart + fixedLength;
+ for (; rowStart < length; rowStart = rowStop + 1, rowStop = rowStart + fixedLength) {
+ fields.add(new String(data, rowStart, rowStop - rowStart).trim());
+ }
+ }
+
+ @Override
+ public Object getObject() {
+ return this;
+ }
+ }
+
+ private static class StringArrayOI extends StructObjectBaseInspector {
+
+ private int length;
+
+ private StringArrayOI(StructTypeInfo type) {
+ List names = type.getAllStructFieldNames();
+ List ois = new ArrayList();
+ for (int i = 0; i < names.size(); i++) {
+ ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ }
+ init(names, ois, null);
+ }
+
+ @Override
+ public Object getStructFieldData(Object data, StructField fieldRef) {
+ return ((FixedLengthed)data).fields.get(((MyField)fieldRef).getFieldID());
+ }
+
+ @Override
+ public List getStructFieldsDataAsList(Object data) {
+ return ((FixedLengthed)data).fields;
+ }
+ }
+}
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
index 7c4fc9f..9a31f0f 100644
--- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
@@ -28,7 +28,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -463,17 +463,15 @@ public void testLazyHBaseRow1() throws SerDeException {
Text nullSequence = new Text("\\N");
String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:c,cfb:d";
- List columnsMapping = null;
+ ColumnMappings columnMappings = null;
try {
- columnsMapping = parseColumnsMapping(hbaseColsMapping);
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
} catch (SerDeException e) {
fail(e.toString());
}
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
-
+ for (ColumnMapping colMap : columnMappings) {
if (!colMap.hbaseRowKey && colMap.qualifierName == null) {
colMap.binaryStorage.add(false);
colMap.binaryStorage.add(false);
@@ -499,7 +497,7 @@ public void testLazyHBaseRow1() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
Result r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':123,'b':['a','b','c'],"
@@ -513,7 +511,7 @@ public void testLazyHBaseRow1() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':123,'b':null,"
@@ -529,7 +527,7 @@ public void testLazyHBaseRow1() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':null,'b':['a'],"
@@ -543,7 +541,7 @@ public void testLazyHBaseRow1() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
@@ -567,7 +565,7 @@ public void testLazyHBaseRow1() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
@@ -587,18 +585,17 @@ public void testLazyHBaseRow2() throws SerDeException {
List fieldNames = Arrays.asList(
new String[]{"key", "a", "b", "c", "d"});
Text nullSequence = new Text("\\N");
- List columnsMapping = null;
String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:,cfc:d";
+ ColumnMappings columnMappings = null;
+
try {
- columnsMapping = parseColumnsMapping(hbaseColsMapping);
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
} catch (SerDeException e) {
fail(e.toString());
}
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
-
+ for (ColumnMapping colMap : columnMappings) {
if (!colMap.hbaseRowKey && colMap.qualifierName == null) {
colMap.binaryStorage.add(false);
colMap.binaryStorage.add(false);
@@ -627,7 +624,7 @@ public void testLazyHBaseRow2() throws SerDeException {
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
Result r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':123,'b':['a','b','c'],"
@@ -643,7 +640,7 @@ public void testLazyHBaseRow2() throws SerDeException {
Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':123,'b':null,"
@@ -659,7 +656,7 @@ public void testLazyHBaseRow2() throws SerDeException {
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':null,'b':['a'],"
@@ -673,7 +670,7 @@ public void testLazyHBaseRow2() throws SerDeException {
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
@@ -689,7 +686,7 @@ public void testLazyHBaseRow2() throws SerDeException {
Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("")));
r = new Result(kvs);
- o.init(r, columnsMapping);
+ o.init(r, columnMappings);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
@@ -713,16 +710,17 @@ public void testLazyHBaseRow3() throws SerDeException {
String hbaseColumnsMapping = ":key#str,cf-int:cq-int#bin,cf-byte:cq-byte#bin,"
+ "cf-short:cq-short#bin,cf-long:cq-long#bin,cf-float:cq-float#bin,cf-double:cq-double#bin,"
+ "cf-string:cq-string#str,cf-bool:cq-bool#bin";
- List columnsMapping = null;
+ ColumnMappings columnMappings = null;
try {
- columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
- } catch (SerDeException sde) {
- fail(sde.toString());
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ } catch (SerDeException e) {
+ fail(e.toString());
}
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
+ ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
+ for (int i = 0; i < columnsMapping.length; i++) {
+ ColumnMapping colMap = columnsMapping[i];
if (i == 0 || i == 7) {
colMap.binaryStorage.add(false);
@@ -741,7 +739,7 @@ public void testLazyHBaseRow3() throws SerDeException {
List kvs = new ArrayList();
byte [] value;
- for (int i = 1; i < columnsMapping.size(); i++) {
+ for (int i = 1; i < columnsMapping.length; i++) {
switch (i) {
@@ -781,13 +779,13 @@ public void testLazyHBaseRow3() throws SerDeException {
throw new RuntimeException("Not expected: " + i);
}
- ColumnMapping colMap = columnsMapping.get(i);
+ ColumnMapping colMap = columnsMapping[i];
kvs.add(new KeyValue(rowKey, colMap.familyNameBytes, colMap.qualifierNameBytes, value));
}
Collections.sort(kvs, KeyValue.COMPARATOR);
Result result = new Result(kvs);
- o.init(result, columnsMapping);
+ o.init(result, columnMappings);
List extends StructField> fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs();
@@ -850,19 +848,4 @@ public void testLazyHBaseRow3() throws SerDeException {
}
}
}
-
- /**
- * Parses the HBase columns mapping specifier to identify the column families, qualifiers
- * and also caches the byte arrays corresponding to them. One of the Hive table
- * columns maps to the HBase row key, by default the first column.
- *
- * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
- * @return List which contains the column mapping information by position
- * @throws SerDeException
- */
- public static List parseColumnsMapping(String columnsMappingSpec)
- throws SerDeException {
- return HBaseSerDe.parseColumnsMapping(columnsMappingSpec, true);
- }
-
}
diff --git hbase-handler/src/test/queries/positive/hbase_custom_key.q hbase-handler/src/test/queries/positive/hbase_custom_key.q
new file mode 100644
index 0000000..b7c00c7
--- /dev/null
+++ hbase-handler/src/test/queries/positive/hbase_custom_key.q
@@ -0,0 +1,18 @@
+CREATE TABLE hbase_ck_1(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory");
+
+CREATE EXTERNAL TABLE hbase_ck_2(key string, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string");
+
+from src tablesample (1 rows)
+insert into table hbase_ck_1 select struct('1000','2000','3000'),'value';
+
+select * from hbase_ck_1;
+select * from hbase_ck_2;
diff --git hbase-handler/src/test/queries/positive/hbase_custom_key2.q hbase-handler/src/test/queries/positive/hbase_custom_key2.q
new file mode 100644
index 0000000..583de06
--- /dev/null
+++ hbase-handler/src/test/queries/positive/hbase_custom_key2.q
@@ -0,0 +1,35 @@
+CREATE TABLE hbase_ck_4(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom2",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2");
+
+from src tablesample (5 rows)
+insert into table hbase_ck_4 select
+struct(
+ cast(key as string),
+ cast(cast(key + 1000 as int) as string),
+ cast(cast(key + 2000 as int) as string)),
+value;
+
+set hive.fetch.task.conversion=more;
+
+-- 165,238,27,311,86
+select * from hbase_ck_4;
+
+-- 238
+explain
+select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238';
+select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238';
+
+-- 165,238
+explain
+select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27';
+select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27';
+
+-- 238,311
+explain
+select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238';
+select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238';
+
diff --git hbase-handler/src/test/results/positive/hbase_custom_key.q.out hbase-handler/src/test/results/positive/hbase_custom_key.q.out
new file mode 100644
index 0000000..4586f2b
--- /dev/null
+++ hbase-handler/src/test/results/positive/hbase_custom_key.q.out
@@ -0,0 +1,60 @@
+PREHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_ck_1(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_ck_1
+PREHOOK: query: CREATE EXTERNAL TABLE hbase_ck_2(key string, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE EXTERNAL TABLE hbase_ck_2(key string, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom",
+ "hbase.columns.mapping" = ":key,cf:string")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_ck_2
+PREHOOK: query: from src tablesample (1 rows)
+insert into table hbase_ck_1 select struct('1000','2000','3000'),'value'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_ck_1
+POSTHOOK: query: from src tablesample (1 rows)
+insert into table hbase_ck_1 select struct('1000','2000','3000'),'value'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_ck_1
+PREHOOK: query: select * from hbase_ck_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from hbase_ck_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_1
+#### A masked pattern was here ####
+{"col1":"1000","col2":"2000","col3":"3000"} value
+PREHOOK: query: select * from hbase_ck_2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from hbase_ck_2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_2
+#### A masked pattern was here ####
+1000$$2000$$3000 value
diff --git hbase-handler/src/test/results/positive/hbase_custom_key2.q.out hbase-handler/src/test/results/positive/hbase_custom_key2.q.out
new file mode 100644
index 0000000..a200e3d
--- /dev/null
+++ hbase-handler/src/test/results/positive/hbase_custom_key2.q.out
@@ -0,0 +1,168 @@
+PREHOOK: query: CREATE TABLE hbase_ck_4(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom2",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_ck_4(key struct, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES (
+ "hbase.table.name" = "hbase_custom2",
+ "hbase.columns.mapping" = ":key,cf:string",
+ "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.TestHBaseKeyFactory2")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_ck_4
+PREHOOK: query: from src tablesample (5 rows)
+insert into table hbase_ck_4 select
+struct(
+ cast(key as string),
+ cast(cast(key + 1000 as int) as string),
+ cast(cast(key + 2000 as int) as string)),
+value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_ck_4
+POSTHOOK: query: from src tablesample (5 rows)
+insert into table hbase_ck_4 select
+struct(
+ cast(key as string),
+ cast(cast(key + 1000 as int) as string),
+ cast(cast(key + 2000 as int) as string)),
+value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_ck_4
+PREHOOK: query: -- 165,238,27,311,86
+select * from hbase_ck_4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+POSTHOOK: query: -- 165,238,27,311,86
+select * from hbase_ck_4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+{"col1":"165","col2":"1165","col3":"2165"} val_165
+{"col1":"238","col2":"1238","col3":"2238"} val_238
+{"col1":"27","col2":"1027","col3":"2027"} val_27
+{"col1":"311","col2":"1311","col3":"2311"} val_311
+{"col1":"86","col2":"1086","col3":"2086"} val_86
+PREHOOK: query: -- 238
+explain
+select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- 238
+explain
+select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: hbase_ck_4
+ filterExpr: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Filter Operator
+ predicate: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Select Operator
+ expressions: key (type: struct), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ ListSink
+
+PREHOOK: query: select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from hbase_ck_4 where key.col1 = '238' AND key.col2 = '1238'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+{"col1":"238","col2":"1238","col3":"2238"} val_238
+PREHOOK: query: -- 165,238
+explain
+select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- 165,238
+explain
+select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: hbase_ck_4
+ filterExpr: ((key.col1 >= '165') and (key.col1 < '27')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Filter Operator
+ predicate: ((key.col1 >= '165') and (key.col1 < '27')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Select Operator
+ expressions: key (type: struct), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ ListSink
+
+PREHOOK: query: select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from hbase_ck_4 where key.col1 >= '165' AND key.col1 < '27'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+{"col1":"165","col2":"1165","col3":"2165"} val_165
+{"col1":"238","col2":"1238","col3":"2238"} val_238
+PREHOOK: query: -- 238,311
+explain
+select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'
+PREHOOK: type: QUERY
+POSTHOOK: query: -- 238,311
+explain
+select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: hbase_ck_4
+ filterExpr: ((key.col1 > '100') and (key.col2 >= '1238')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Filter Operator
+ predicate: ((key.col1 > '100') and (key.col2 >= '1238')) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Select Operator
+ expressions: key (type: struct), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ ListSink
+
+PREHOOK: query: select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+POSTHOOK: query: select * from hbase_ck_4 where key.col1 > '100' AND key.col2 >= '1238'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_ck_4
+#### A masked pattern was here ####
+{"col1":"238","col2":"1238","col3":"2238"} val_238
+{"col1":"311","col2":"1311","col3":"2311"} val_311
diff --git itests/util/pom.xml itests/util/pom.xml
index e9720df..b38c55b 100644
--- itests/util/pom.xml
+++ itests/util/pom.xml
@@ -46,6 +46,12 @@
org.apache.hive
+ hive-hbase-handler
+ ${project.version}
+ tests
+
+
+ org.apache.hive
hive-metastore
${project.version}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9b160e3..f5a3312 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -660,11 +660,7 @@ public static Path getPlanPath(Configuration conf) {
* @return Bytes.
*/
public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Output output = new Output(baos);
- runtimeSerializationKryo.get().writeObject(output, expr);
- output.close();
- return baos.toByteArray();
+ return serializeObjectToKryo(expr);
}
/**
@@ -673,11 +669,7 @@ public static Path getPlanPath(Configuration conf) {
* @return Expression; null if deserialization succeeded, but the result type is incorrect.
*/
public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) {
- Input inp = new Input(new ByteArrayInputStream(bytes));
- ExprNodeGenericFuncDesc func = runtimeSerializationKryo.get().
- readObject(inp,ExprNodeGenericFuncDesc.class);
- inp.close();
- return func;
+ return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class);
}
public static String serializeExpression(ExprNodeGenericFuncDesc expr) {
@@ -698,6 +690,37 @@ public static ExprNodeGenericFuncDesc deserializeExpression(String s) {
return deserializeExpressionFromKryo(bytes);
}
+ private static byte[] serializeObjectToKryo(Serializable object) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ runtimeSerializationKryo.get().writeObject(output, object);
+ output.close();
+ return baos.toByteArray();
+ }
+
+ private static T deserializeObjectFromKryo(byte[] bytes, Class clazz) {
+ Input inp = new Input(new ByteArrayInputStream(bytes));
+ T func = runtimeSerializationKryo.get().readObject(inp, clazz);
+ inp.close();
+ return func;
+ }
+
+ public static String serializeObject(Serializable expr) {
+ try {
+ return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+ public static T deserializeObject(String s, Class clazz) {
+ try {
+ return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz);
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
index d39ee2e..683618f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
@@ -57,12 +58,19 @@
private final Set udfNames;
private final Set allowedColumnNames;
+ private FieldValidator fieldValidator;
+
+ private boolean acceptsFields;
public IndexPredicateAnalyzer() {
udfNames = new HashSet();
allowedColumnNames = new HashSet();
}
+ public void setFieldValidator(FieldValidator fieldValidator) {
+ this.fieldValidator = fieldValidator;
+ }
+
/**
* Registers a comparison operator as one which can be satisfied
* by an index search. Unless this is called, analyzePredicate
@@ -175,11 +183,19 @@ private ExprNodeDesc analyzeExpr(
ExprNodeDesc expr1 = (ExprNodeDesc) nodeOutputs[0];
ExprNodeDesc expr2 = (ExprNodeDesc) nodeOutputs[1];
ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2);
- if (extracted == null) {
+ if (extracted == null || (extracted.length > 2 && !acceptsFields)) {
return expr;
}
- if (extracted.length > 2) {
+
+ ExprNodeColumnDesc columnDesc;
+ ExprNodeConstantDesc constantDesc;
+ if (extracted[0] instanceof ExprNodeConstantDesc) {
genericUDF = genericUDF.flip();
+ columnDesc = (ExprNodeColumnDesc) extracted[1];
+ constantDesc = (ExprNodeConstantDesc) extracted[0];
+ } else {
+ columnDesc = (ExprNodeColumnDesc) extracted[0];
+ constantDesc = (ExprNodeConstantDesc) extracted[1];
}
String udfName = genericUDF.getUdfName();
@@ -187,22 +203,34 @@ private ExprNodeDesc analyzeExpr(
return expr;
}
- ExprNodeColumnDesc columnDesc = (ExprNodeColumnDesc) extracted[0];
- ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) extracted[1];
if (!allowedColumnNames.contains(columnDesc.getColumn())) {
return expr;
}
+ String[] fields = null;
+ if (extracted.length > 2) {
+ ExprNodeFieldDesc fieldDesc = (ExprNodeFieldDesc) extracted[2];
+ if (!isValidField(fieldDesc)) {
+ return expr;
+ }
+ fields = ExprNodeDescUtils.extractFields(fieldDesc);
+ }
+
searchConditions.add(
new IndexSearchCondition(
columnDesc,
udfName,
constantDesc,
- expr));
+ expr,
+ fields));
// we converted the expression to a search condition, so
// remove it from the residual predicate
- return null;
+ return fields == null ? null : expr;
+ }
+
+ private boolean isValidField(ExprNodeFieldDesc field) {
+ return fieldValidator == null || fieldValidator.validate(field);
}
/**
@@ -232,4 +260,27 @@ public ExprNodeGenericFuncDesc translateSearchConditions(
}
return expr;
}
+
+ public void setAcceptsFields(boolean acceptsFields) {
+ this.acceptsFields = acceptsFields;
+ }
+
+ public static interface FieldValidator {
+ boolean validate(ExprNodeFieldDesc exprNodeDesc);
+ }
+
+ public static IndexPredicateAnalyzer createAnalyzer(boolean equalOnly) {
+
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+ analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
+ if (equalOnly) {
+ return analyzer;
+ }
+ analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan");
+ analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan");
+ analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan");
+ analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
+
+ return analyzer;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
index 5f1329c..3a2ecb7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
@@ -33,6 +33,16 @@
private ExprNodeConstantDesc constantDesc;
private ExprNodeGenericFuncDesc comparisonExpr;
+ private String[] fields;
+
+ public IndexSearchCondition(
+ ExprNodeColumnDesc columnDesc,
+ String comparisonOp,
+ ExprNodeConstantDesc constantDesc,
+ ExprNodeGenericFuncDesc comparisonExpr) {
+ this(columnDesc, comparisonOp, constantDesc, comparisonExpr, null);
+ }
+
/**
* Constructs a search condition, which takes the form
* column-ref comparison-op constant-value .
@@ -50,12 +60,14 @@ public IndexSearchCondition(
ExprNodeColumnDesc columnDesc,
String comparisonOp,
ExprNodeConstantDesc constantDesc,
- ExprNodeGenericFuncDesc comparisonExpr) {
+ ExprNodeGenericFuncDesc comparisonExpr,
+ String[] fields) {
this.columnDesc = columnDesc;
this.comparisonOp = comparisonOp;
this.constantDesc = constantDesc;
this.comparisonExpr = comparisonExpr;
+ this.fields = fields;
}
public void setColumnDesc(ExprNodeColumnDesc columnDesc) {
@@ -90,6 +102,10 @@ public ExprNodeGenericFuncDesc getComparisonExpr() {
return comparisonExpr;
}
+ public String[] getFields() {
+ return fields;
+ }
+
@Override
public String toString() {
return comparisonExpr.getExprString();
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 590241a..dc9218a 100755
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -21,6 +21,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -413,6 +414,13 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
return;
}
+ Serializable filterObject = scanDesc.getFilterObject();
+ if (filterObject != null) {
+ jobConf.set(
+ TableScanDesc.FILTER_OBJECT_CONF_STR,
+ Utilities.serializeObject(filterObject));
+ }
+
String filterText = filterExpr.getExprString();
String filterExprSerialized = Utilities.serializeExpression(filterExpr);
if (LOG.isDebugEnabled()) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
index 9f35575..7d7c764 100644
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java
@@ -23,6 +23,8 @@
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.JobConf;
+import java.io.Serializable;
+
/**
* HiveStoragePredicateHandler is an optional companion to {@link
* HiveStorageHandler}; it should only be implemented by handlers which
@@ -69,6 +71,11 @@ public DecomposedPredicate decomposePredicate(
public ExprNodeGenericFuncDesc pushedPredicate;
/**
+ * Serialized format for filter
+ */
+ public Serializable pushedPredicateObject;
+
+ /**
* Portion of predicate to be post-evaluated by Hive for any rows
* which are returned by storage handler.
*/
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
index e50026b..b6234f0 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
@@ -260,12 +260,46 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator> cur
return new ExprNodeDesc[] {expr1, expr2};
}
if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeColumnDesc) {
- return new ExprNodeDesc[] {expr2, expr1, null}; // add null as a marker (inverted order)
+ return new ExprNodeDesc[] {expr1, expr2};
+ }
+ if (expr1 instanceof ExprNodeFieldDesc && expr2 instanceof ExprNodeConstantDesc) {
+ ExprNodeColumnDesc columnDesc = extractColumn(expr1);
+ return columnDesc != null ? new ExprNodeDesc[] {columnDesc, expr2, expr1} : null;
+ }
+ if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeFieldDesc) {
+ ExprNodeColumnDesc columnDesc = extractColumn(expr2);
+ return columnDesc != null ? new ExprNodeDesc[] {expr1, columnDesc, expr2} : null;
}
// todo: constant op constant
return null;
}
+ public static String[] extractFields(ExprNodeFieldDesc expr) {
+ return extractFields(expr, new ArrayList()).toArray(new String[0]);
+ }
+
+ private static List extractFields(ExprNodeDesc expr, List fields) {
+ if (expr instanceof ExprNodeFieldDesc) {
+ ExprNodeFieldDesc field = (ExprNodeFieldDesc)expr;
+ fields.add(field.getFieldName());
+ return extractFields(field.getDesc(), fields);
+ }
+ if (expr instanceof ExprNodeColumnDesc) {
+ return fields;
+ }
+ throw new IllegalStateException();
+ }
+
+ private static ExprNodeColumnDesc extractColumn(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ return (ExprNodeColumnDesc)expr;
+ }
+ if (expr instanceof ExprNodeFieldDesc) {
+ return extractColumn(((ExprNodeFieldDesc)expr).getDesc());
+ }
+ return null;
+ }
+
// from IndexPredicateAnalyzer
private static ExprNodeDesc extractConstant(ExprNodeDesc expr) {
if (!(expr instanceof ExprNodeGenericFuncDesc)) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 10bae4d..8acb39b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -61,6 +62,7 @@
private int maxStatsKeyPrefixLength = -1;
private ExprNodeGenericFuncDesc filterExpr;
+ private transient Serializable filterObject;
public static final String FILTER_EXPR_CONF_STR =
"hive.io.filter.expr.serialized";
@@ -68,6 +70,9 @@
public static final String FILTER_TEXT_CONF_STR =
"hive.io.filter.text";
+ public static final String FILTER_OBJECT_CONF_STR =
+ "hive.io.filter.object";
+
// input file name (big) to bucket number
private Map bucketFileNameMapping;
@@ -110,6 +115,14 @@ public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) {
this.filterExpr = filterExpr;
}
+ public Serializable getFilterObject() {
+ return filterObject;
+ }
+
+ public void setFilterObject(Serializable filterObject) {
+ this.filterObject = filterObject;
+ }
+
public void setAlias(String alias) {
this.alias = alias;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
index c0a8269..7aaf455 100644
--- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
@@ -843,6 +843,8 @@ private static ExprNodeGenericFuncDesc pushFilterToStorageHandler(
}
}
tableScanDesc.setFilterExpr(decomposed.pushedPredicate);
+ tableScanDesc.setFilterObject(decomposed.pushedPredicateObject);
+
return (ExprNodeGenericFuncDesc)decomposed.residualPredicate;
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java
new file mode 100644
index 0000000..b7efff0
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/StructObject.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.util.List;
+
+public interface StructObject {
+
+ Object getField(int fieldID);
+
+ List getFieldsAsList();
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java
new file mode 100644
index 0000000..9a1a3b2
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/StructObjectBaseInspector.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class StructObjectBaseInspector extends StructObjectInspector {
+
+ protected static class MyField implements StructField {
+
+ protected final int fieldID;
+ protected final String fieldName;
+ protected final String fieldComment;
+ protected final ObjectInspector fieldObjectInspector;
+
+ public MyField(int fieldID, String fieldName,
+ ObjectInspector fieldObjectInspector, String fieldComment) {
+ this.fieldID = fieldID;
+ this.fieldName = fieldName.toLowerCase();
+ this.fieldObjectInspector = fieldObjectInspector;
+ this.fieldComment = fieldComment;
+ }
+
+ public int getFieldID() {
+ return fieldID;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public ObjectInspector getFieldObjectInspector() {
+ return fieldObjectInspector;
+ }
+
+ public String getFieldComment() {
+ return fieldComment;
+ }
+ @Override
+ public String toString() {
+ return "" + fieldID + ":" + fieldName;
+ }
+ }
+
+ private List fields;
+
+ protected StructObjectBaseInspector() {
+ super();
+ }
+ /**
+ * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead.
+ */
+ public StructObjectBaseInspector(List structFieldNames,
+ List structFieldObjectInspectors) {
+ init(structFieldNames, structFieldObjectInspectors, null);
+ }
+
+ public StructObjectBaseInspector(List structFieldNames,
+ List structFieldObjectInspectors,
+ List structFieldComments) {
+ init(structFieldNames, structFieldObjectInspectors, structFieldComments);
+ }
+
+ protected void init(List structFieldNames,
+ List structFieldObjectInspectors,
+ List structFieldComments) {
+ assert (structFieldNames.size() == structFieldObjectInspectors.size());
+ assert (structFieldComments == null ||
+ (structFieldNames.size() == structFieldComments.size()));
+
+ fields = new ArrayList(structFieldNames.size());
+ for (int i = 0; i < structFieldNames.size(); i++) {
+ fields.add(createField(i,
+ structFieldNames.get(i), structFieldObjectInspectors.get(i),
+ structFieldComments == null ? null : structFieldComments.get(i)));
+ }
+ }
+
+ protected MyField createField(int index, String fieldName, ObjectInspector fieldOI, String comment) {
+ return new MyField(index, fieldName, fieldOI, comment);
+ }
+
+ @Override
+ public String getTypeName() {
+ return ObjectInspectorUtils.getStandardStructTypeName(this);
+ }
+
+ @Override
+ public final Category getCategory() {
+ return Category.STRUCT;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String fieldName) {
+ return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, fields);
+ }
+
+ @Override
+ public List extends StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java
index 1fd6853..fd06f58 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java
@@ -23,13 +23,14 @@
import java.util.List;
import org.apache.hadoop.hive.serde2.SerDeStatsStruct;
+import org.apache.hadoop.hive.serde2.StructObject;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-public abstract class ColumnarStructBase implements SerDeStatsStruct {
+public abstract class ColumnarStructBase implements StructObject, SerDeStatsStruct {
class FieldInfo {
LazyObjectBase field;
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
index 10f4c05..9b5ccbe 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
@@ -25,7 +25,7 @@
* A LazyObject can represent any primitive object or hierarchical object like
* array, map or struct.
*/
-public abstract class LazyObject extends LazyObjectBase {
+public abstract class LazyObject implements LazyObjectBase {
protected OI oi;
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java
index 3334dff..7e42b3f 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.serde2.lazy;
-public abstract class LazyObjectBase {
+public interface LazyObjectBase {
/**
* Set the data for this LazyObjectBase. We take ByteArrayRef instead of byte[] so
@@ -33,12 +33,12 @@
* The length of the data, starting from "start"
* @see ByteArrayRef
*/
- public abstract void init(ByteArrayRef bytes, int start, int length);
+ void init(ByteArrayRef bytes, int start, int length);
/**
* If the LazyObjectBase is a primitive Object, then deserialize it and return the
* actual primitive Object. Otherwise (array, map, struct), return this.
*/
- public abstract Object getObject();
+ Object getObject();
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
index 82c1263..ac180ca 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
@@ -171,6 +171,10 @@ public byte getEscapeChar() {
public boolean[] getNeedsEscape() {
return needsEscape;
}
+
+ public boolean isPrimitive(int index) {
+ return columnTypes.get(index).getCategory() == Category.PRIMITIVE;
+ }
}
SerDeParameters serdeParams = null;
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java
index 8a1ea46..a01cd66 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java
@@ -23,10 +23,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStatsStruct;
+import org.apache.hadoop.hive.serde2.StructObject;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Text;
/**
@@ -36,8 +37,8 @@
* LazyStruct does not deal with the case of a NULL struct. That is handled by
* the parent LazyObject.
*/
-public class LazyStruct extends LazyNonPrimitive implements
- SerDeStatsStruct {
+public class LazyStruct extends LazyNonPrimitive
+ implements StructObject, SerDeStatsStruct {
private static Log LOG = LogFactory.getLog(LazyStruct.class.getName());
@@ -62,7 +63,7 @@
/**
* The fields of the struct.
*/
- LazyObject[] fields;
+ LazyObjectBase[] fields;
/**
* Whether init() has been called on the field or not.
*/
@@ -101,17 +102,7 @@ private void parse() {
byte escapeChar = oi.getEscapeChar();
if (fields == null) {
- List extends StructField> fieldRefs = ((StructObjectInspector) oi)
- .getAllStructFieldRefs();
- fields = new LazyObject[fieldRefs.size()];
- for (int i = 0; i < fields.length; i++) {
- fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i)
- .getFieldObjectInspector());
- }
- fieldInited = new boolean[fields.length];
- // Extra element to make sure we have the same formula to compute the
- // length of each element of the array.
- startPosition = new int[fields.length + 1];
+ initLazyFields(oi.getAllStructFieldRefs());
}
int structByteEnd = start + length;
@@ -172,6 +163,25 @@ private void parse() {
parsed = true;
}
+ protected final void initLazyFields(List extends StructField> fieldRefs) {
+ fields = new LazyObjectBase[fieldRefs.size()];
+ for (int i = 0; i < fields.length; i++) {
+ try {
+ fields[i] = createLazyField(i, fieldRefs.get(i));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ fieldInited = new boolean[fields.length];
+ // Extra element to make sure we have the same formula to compute the
+ // length of each element of the array.
+ startPosition = new int[fields.length + 1];
+ }
+
+ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException {
+ return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector());
+ }
+
/**
* Get one field out of the struct.
*
@@ -221,14 +231,14 @@ private Object uncheckedGetField(int fieldID) {
return fields[fieldID].getObject();
}
- ArrayList cachedList;
+ List cachedList;
/**
* Get the values of the fields as an ArrayList.
*
* @return The values of the fields as an ArrayList.
*/
- public ArrayList getFieldsAsList() {
+ public List getFieldsAsList() {
if (!parsed) {
parse();
}
@@ -256,7 +266,7 @@ protected void setParsed(boolean parsed) {
this.parsed = parsed;
}
- protected LazyObject[] getFields() {
+ protected LazyObjectBase[] getFields() {
return fields;
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
index 8a5386a..0813700 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazySimpleStructObjectInspector.java
@@ -23,7 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.StructObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -184,7 +184,7 @@ public Object getStructFieldData(Object data, StructField fieldRef) {
if (data == null) {
return null;
}
- LazyStruct struct = (LazyStruct) data;
+ StructObject struct = (StructObject) data;
MyField f = (MyField) fieldRef;
int fieldID = f.getFieldID();
@@ -198,15 +198,8 @@ public Object getStructFieldData(Object data, StructField fieldRef) {
if (data == null) {
return null;
}
-
- // Iterate over all the fields picking up the nested structs within them
- List result = new ArrayList(fields.size());
-
- for (MyField myField : fields) {
- result.add(getStructFieldData(data, myField));
- }
-
- return result;
+ StructObject struct = (StructObject) data;
+ return struct.getFieldsAsList();
}
// For LazyStruct
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java
index 598683f..b3625b3 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java
@@ -27,7 +27,7 @@
* A LazyBinaryObject can represent any primitive object or hierarchical object
* like string, list, map or struct.
*/
-public abstract class LazyBinaryObject extends LazyObjectBase {
+public abstract class LazyBinaryObject implements LazyObjectBase {
OI oi;
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java
index caf3517..98a35c7 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.serde2.SerDeStatsStruct;
+import org.apache.hadoop.hive.serde2.StructObject;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.RecordInfo;
import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector;
@@ -42,8 +43,8 @@
* Following B, there is another section A and B. This pattern repeats until the
* all struct fields are serialized.
*/
-public class LazyBinaryStruct extends
- LazyBinaryNonPrimitive implements SerDeStatsStruct {
+public class LazyBinaryStruct extends LazyBinaryNonPrimitive
+ implements StructObject, SerDeStatsStruct {
private static Log LOG = LogFactory.getLog(LazyBinaryStruct.class.getName());