diff --git hbase-handler/pom.xml hbase-handler/pom.xml
index 132af43..ad7c52a 100644
--- hbase-handler/pom.xml
+++ hbase-handler/pom.xml
@@ -222,6 +222,19 @@
${basedir}/src/java${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java
new file mode 100644
index 0000000..18fb5ea
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public abstract class AbstractHBaseKeyFactory implements HBaseKeyFactory {
+
+ protected HBaseSerDeParameters hbaseParams;
+ protected ColumnMappings.ColumnMapping keyMapping;
+ protected Properties properties;
+
+ @Override
+ public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException {
+ this.hbaseParams = hbaseParam;
+ this.keyMapping = hbaseParam.getKeyColumnMapping();
+ this.properties = properties;
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException {
+ TableMapReduceUtil.addDependencyJars(jobConf, getClass());
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) {
+ return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
new file mode 100644
index 0000000..9cae5d3
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This source file is based on code taken from SQLLine 1.0.2
+ * See SQLLine notice in LICENSE
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ColumnMappings implements Iterable {
+
+ private final int keyIndex;
+ private final ColumnMapping[] columnsMapping;
+
+ public ColumnMappings(List columnMapping, int keyIndex) {
+ this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]);
+ this.keyIndex = keyIndex;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return Iterators.forArray(columnsMapping);
+ }
+
+ public int size() {
+ return columnsMapping.length;
+ }
+
+ String toTypesString() {
+ StringBuilder sb = new StringBuilder();
+ for (ColumnMapping colMap : columnsMapping) {
+ if (sb.length() > 0) {
+ sb.append(":");
+ }
+ if (colMap.hbaseRowKey) {
+ // the row key column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ } else if (colMap.qualifierName == null) {
+ // a column family become a MAP
+ sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
+ + serdeConstants.STRING_TYPE_NAME + ">");
+ } else {
+ // an individual column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ }
+ }
+ return sb.toString();
+ }
+
+ void setHiveColumnDescription(String serdeName,
+ List columnNames, List columnTypes) throws SerDeException {
+ if (columnsMapping.length != columnNames.size()) {
+ throw new SerDeException(serdeName + ": columns has " + columnNames.size() +
+ " elements while hbase.columns.mapping has " + columnsMapping.length + " elements" +
+ " (counting the key if implicit)");
+ }
+
+ // check that the mapping schema is right;
+ // check that the "column-family:" is mapped to Map
+ // where key extends LazyPrimitive, ?> and thus has type Category.PRIMITIVE
+ for (int i = 0; i < columnNames.size(); i++) {
+ ColumnMapping colMap = columnsMapping[i];
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ TypeInfo typeInfo = columnTypes.get(i);
+ if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) ||
+ (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+ != ObjectInspector.Category.PRIMITIVE)) {
+
+ throw new SerDeException(
+ serdeName + ": hbase column family '" + colMap.familyName
+ + "' should be mapped to Map extends LazyPrimitive, ?>,?>, that is "
+ + "the Key for the map should be of primitive type, but is mapped to "
+ + typeInfo.getTypeName());
+ }
+ }
+ colMap.columnName = columnNames.get(i);
+ colMap.columnType = columnTypes.get(i);
+ }
+ }
+
+ /**
+ * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+ * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+ * data types like boolean, byte, short, int, long, float, and double. This applies to
+ * regular columns and also to map column types which are associated with an HBase column
+ * family. For the map types, we apply the specification to the key or the value provided it
+ * is one of the above primitive types. The specifier is a colon separated value of the form
+ * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+ * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+ * table level default.
+ *
+ * @param hbaseTableDefaultStorageType - the specification associated with the table property
+ * hbase.table.default.storage.type
+ * @throws SerDeException on parse error.
+ */
+ void parseColumnStorageTypes(String hbaseTableDefaultStorageType) throws SerDeException {
+
+ boolean tableBinaryStorage = false;
+
+ if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+ if (hbaseTableDefaultStorageType.equals("binary")) {
+ tableBinaryStorage = true;
+ } else if (!hbaseTableDefaultStorageType.equals("string")) {
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+ " parameter must be specified as" +
+ " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+ "' is not a valid specification for this table/serde property.");
+ }
+ }
+
+ // parse the string to determine column level storage type for primitive types
+ // 's' is for variable length string format storage
+ // 'b' is for fixed width binary storage of bytes
+ // '-' is for table storage type, which defaults to UTF8 string
+ // string data is always stored in the default escaped storage format; the data types
+ // byte, short, int, long, float, and double have a binary byte oriented storage option
+ for (ColumnMapping colMap : columnsMapping) {
+ TypeInfo colType = colMap.columnType;
+ String mappingSpec = colMap.mappingSpec;
+ String[] mapInfo = mappingSpec.split("#");
+ String[] storageInfo = null;
+
+ if (mapInfo.length == 2) {
+ storageInfo = mapInfo[1].split(":");
+ }
+
+ if (storageInfo == null) {
+
+ // use the table default storage specification
+ if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else if (colType.getCategory() == ObjectInspector.Category.MAP) {
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 1) {
+ // we have a storage specification for a primitive column type
+ String storageOption = storageInfo[0];
+
+ if ((colType.getCategory() == ObjectInspector.Category.MAP) ||
+ !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+ "binary".startsWith(storageOption))) {
+ throw new SerDeException("Error: A column storage specification is one of the following:"
+ + " '-', a prefix of 'string', or a prefix of 'binary'. "
+ + storageOption + " is not a valid storage option specification for "
+ + colMap.columnName);
+ }
+
+ if (colType.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if ("-".equals(storageOption)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(storageOption)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 2) {
+ // we have a storage specification for a map column type
+
+ String keyStorage = storageInfo[0];
+ String valStorage = storageInfo[1];
+
+ if ((colType.getCategory() != ObjectInspector.Category.MAP) ||
+ !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+ "binary".startsWith(keyStorage)) ||
+ !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+ "binary".startsWith(valStorage))) {
+ throw new SerDeException("Error: To specify a valid column storage type for a Map"
+ + " column, use any two specifiers from '-', a prefix of 'string', "
+ + " and a prefix of 'binary' separated by a ':'."
+ + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+ + " key and value parts of the Map,?> respectively."
+ + " Invalid storage specification for column "
+ + colMap.columnName
+ + "; " + storageInfo[0] + ":" + storageInfo[1]);
+ }
+
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if (keyStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(keyStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ if (valStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(valStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (colMap.binaryStorage.size() != 2) {
+ throw new SerDeException("Error: In parsing the storage specification for column "
+ + colMap.columnName);
+ }
+
+ } else {
+ // error in storage specification
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification "
+ + mappingSpec + " is not valid for column: "
+ + colMap.columnName);
+ }
+ }
+ }
+
+ public ColumnMapping getKeyMapping() {
+ return columnsMapping[keyIndex];
+ }
+
+ public int getKeyIndex() {
+ return keyIndex;
+ }
+
+ public ColumnMapping[] getColumnsMapping() {
+ return columnsMapping;
+ }
+
+ // todo use final fields
+ static class ColumnMapping {
+
+ ColumnMapping() {
+ binaryStorage = new ArrayList(2);
+ }
+
+ String columnName;
+ TypeInfo columnType;
+
+ String familyName;
+ String qualifierName;
+ byte[] familyNameBytes;
+ byte[] qualifierNameBytes;
+ List binaryStorage;
+ boolean hbaseRowKey;
+ String mappingSpec;
+ String qualifierPrefix;
+ byte[] qualifierPrefixBytes;
+
+ public boolean isCategory(ObjectInspector.Category category) {
+ return columnType.getCategory() == category;
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java
new file mode 100644
index 0000000..a04bd1b
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class CompositeHBaseKeyFactory
+ extends DefaultHBaseKeyFactory implements Configurable {
+
+ public static final Log LOG = LogFactory.getLog(CompositeHBaseKeyFactory.class);
+
+ private final Class keyClass;
+ private final Constructor constructor;
+
+ private Configuration conf;
+
+ public CompositeHBaseKeyFactory(Class keyClass) throws Exception {
+ // see javadoc of HBaseCompositeKey
+ this.keyClass = keyClass;
+ this.constructor = keyClass.getDeclaredConstructor(
+ LazySimpleStructObjectInspector.class, Properties.class, Configuration.class);
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException {
+ super.configureJobConf(tableDesc, jobConf);
+ TableMapReduceUtil.addDependencyJars(jobConf, keyClass);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public T createKey(ObjectInspector inspector) throws SerDeException {
+ try {
+ return (T) constructor.newInstance(inspector, properties, conf);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
+ ExprNodeDesc predicate) {
+ String keyColName = hbaseParams.getKeyColumnMapping().columnName;
+
+ IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true);
+ analyzer.allowColumnName(keyColName);
+ analyzer.setAcceptsFields(true);
+ analyzer.setFieldValidator(new Validator());
+
+ DecomposedPredicate decomposed = new DecomposedPredicate();
+
+ List conditions = new ArrayList();
+ decomposed.residualPredicate =
+ (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions);
+ if (!conditions.isEmpty()) {
+ decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions);
+ try {
+ decomposed.pushedPredicateObject = setupFilter(keyColName, conditions);
+ } catch (Exception e) {
+ LOG.warn("Failed to decompose predicates", e);
+ return null;
+ }
+ }
+ return decomposed;
+ }
+
+ /**
+ * Validates the field in the {@link ExprNodeFieldDesc}. Basically this validates that the given field is the first field in the given struct.
+ * This is important specially in case of structs as order of fields in the structs is important when using for any filter down the line
+ **/
+ private Serializable setupFilter(String keyColName, List conditions)
+ throws Exception {
+ HBaseScanRange scanRange = new HBaseScanRange();
+ for (IndexSearchCondition condition : conditions) {
+ if (condition.getFields() == null) {
+ continue;
+ }
+ String field = condition.getFields()[0];
+ Object value = condition.getConstantDesc().getValue();
+ scanRange.addFilter(new FamilyFilter(
+ CompareFilter.CompareOp.EQUAL, new BinaryComparator(field.getBytes())));
+ }
+ return scanRange;
+ }
+
+ private static class Validator implements IndexPredicateAnalyzer.FieldValidator {
+
+ public boolean validate(ExprNodeFieldDesc fieldDesc) {
+ String fieldName = fieldDesc.getFieldName();
+
+ ExprNodeDesc nodeDesc = fieldDesc.getDesc();
+
+ TypeInfo typeInfo = nodeDesc.getTypeInfo();
+
+ if (!(typeInfo instanceof StructTypeInfo)) {
+ // since we are working off a ExprNodeFieldDesc which represents a field within a struct, this
+ // should never happen
+ throw new AssertionError("Expected StructTypeInfo. Found:" + typeInfo.getTypeName());
+ }
+
+ List allFieldNames = ((StructTypeInfo) typeInfo).getAllStructFieldNames();
+
+ if (allFieldNames == null || allFieldNames.size() == 0) {
+ return false;
+ }
+
+ String firstElement = allFieldNames.get(0);
+
+ return firstElement.equals(fieldName);
+ }
+ }
+}
\ No newline at end of file
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
new file mode 100644
index 0000000..5731e45
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory {
+
+ protected LazySimpleSerDe.SerDeParameters serdeParams;
+ protected HBaseRowSerializer serializer;
+
+ @Override
+ public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException {
+ super.init(hbaseParam, properties);
+ this.serdeParams = hbaseParam.getSerdeParams();
+ this.serializer = new HBaseRowSerializer(hbaseParam);
+ }
+
+ @Override
+ public ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException {
+ return LazyFactory.createLazyObjectInspector(type, serdeParams.getSeparators(), 1,
+ serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar());
+ }
+
+ @Override
+ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException {
+ return LazyFactory.createLazyObject(inspector, keyMapping.binaryStorage.get(0));
+ }
+
+ @Override
+ public byte[] serializeKey(Object object, StructField field) throws IOException {
+ return serializer.serializeKeyField(object, field, keyMapping);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
index 5008f15..d184216 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
@@ -20,9 +20,7 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
new file mode 100644
index 0000000..251d22e
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Provides custom implementation of object and object inspector for hbase key.
+ * The key object should implement LazyObjectBase.
+ *
+ * User can optionally implement HiveStoragePredicateHandler for handling filter predicates
+ */
+public interface HBaseKeyFactory extends HiveStoragePredicateHandler {
+
+ /**
+ * initialize factory with properties
+ */
+ void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException;
+
+ /**
+ * create custom object inspector for hbase key
+ * @param type type information
+ */
+ ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeException;
+
+ /**
+ * create custom object for hbase key
+ * @param inspector OI create by {@link HBaseKeyFactory#createKeyObjectInspector}
+ */
+ LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException;
+
+ /**
+ * serialize hive object in internal format of custom key
+ *
+ * @param object
+ * @param inspector
+ * @param output
+ *
+ * @return true if it's not null
+ * @throws java.io.IOException
+ */
+ byte[] serializeKey(Object object, StructField field) throws IOException;
+
+ /**
+ * configure jobConf for this factory
+ *
+ * @param tableDesc
+ * @param jobConf
+ */
+ void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOException;
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
new file mode 100644
index 0000000..5c26456
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// Does same thing with LazyFactory#createLazyObjectInspector except that this replaces
+// original keyOI with OI which is create by HBaseKeyFactory provided by serde property for hbase
+public class HBaseLazyObjectFactory {
+
+ public static ObjectInspector createLazyHBaseStructInspector(
+ SerDeParameters serdeParams, int index, HBaseKeyFactory factory) throws SerDeException {
+ List columnTypes = serdeParams.getColumnTypes();
+ ArrayList columnObjectInspectors = new ArrayList(
+ columnTypes.size());
+ for (int i = 0; i < columnTypes.size(); i++) {
+ if (i == index) {
+ columnObjectInspectors.add(factory.createKeyObjectInspector(columnTypes.get(i)));
+ } else {
+ columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(
+ columnTypes.get(i), serdeParams.getSeparators(), 1, serdeParams.getNullSequence(),
+ serdeParams.isEscaped(), serdeParams.getEscapeChar()));
+ }
+ }
+ return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+ serdeParams.getColumnNames(), columnObjectInspectors, serdeParams.getSeparators()[0],
+ serdeParams.getNullSequence(), serdeParams.isLastColumnTakesRest(),
+ serdeParams.isEscaped(), serdeParams.getEscapeChar());
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
new file mode 100644
index 0000000..fe6081e
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class HBaseRowSerializer {
+
+ private final HBaseKeyFactory keyFactory;
+ private final HBaseSerDeParameters hbaseParam;
+ private final LazySimpleSerDe.SerDeParameters serdeParam;
+
+ private final int keyIndex;
+ private final ColumnMapping keyMapping;
+ private final ColumnMapping[] columnMappings;
+ private final byte[] separators; // the separators array
+ private final boolean escaped; // whether we need to escape the data when writing out
+ private final byte escapeChar; // which char to use as the escape char, e.g. '\\'
+ private final boolean[] needsEscape; // which chars need to be escaped. This array should have size
+ // of 128. Negative byte values (or byte values >= 128) are
+ // never escaped.
+
+ private final long putTimestamp;
+ private final ByteStream.Output output = new ByteStream.Output();
+
+ public HBaseRowSerializer(HBaseSerDeParameters hbaseParam) {
+ this.hbaseParam = hbaseParam;
+ this.keyFactory = hbaseParam.getKeyFactory();
+ this.serdeParam = hbaseParam.getSerdeParams();
+ this.separators = serdeParam.getSeparators();
+ this.escaped = serdeParam.isEscaped();
+ this.escapeChar = serdeParam.getEscapeChar();
+ this.needsEscape = serdeParam.getNeedsEscape();
+ this.keyIndex = hbaseParam.getKeyIndex();
+ this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping();
+ this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping();
+ this.putTimestamp = hbaseParam.getPutTimestamp();
+ }
+
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws Exception {
+ if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ List