*
- * */
+ */
public class HBaseCompositeKey extends LazyStruct {
+ protected static final Log LOG = LogFactory.getLog(HBaseCompositeKey.class);
+
public HBaseCompositeKey(LazySimpleStructObjectInspector oi) {
super(oi);
}
@@ -96,4 +111,101 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) {
return lazyObject;
}
+
+ protected DecomposedPredicate decomposePredicate(JobConf jobConf,
+ Deserializer deserializer, ExprNodeDesc predicate) throws Exception {
+ return null;
+ }
+
+ /**
+ * Returns the optional {@link Filter row filter} to be applied to the
+ * {@link org.apache.hadoop.hbase.client.Scan scans} to
+ * filter out unwanted row keys
+ *
+ *
+ * Note 1: For cases where the keys are salted, it is highly recommended that a custom filter be provided.
+ * The reason being for a scan to work properly for salted keys, multiple scans are required and
+ * the current implementation of hbase integration does not support that.
+ *
+ *
+ *
+ * Note 2: Since currently hive supports only pushdown for the first field in the struct, hence the value of
+ * fieldID will always be 0. Additional support for more fields will be added in future.
+ *
+ *
+ * @param fieldName
+ * id for the value
+ * @param values
+ * values that are parts of the row key
+ * @return {@link Filter filter} that can be applied to the {@link Scan scans}. By default,
+ * returns {@code null}. Can be overridden to provide a custom {@link Filter filter}
+ * */
+ protected Filter getFilter(String fieldName, Object... values) {
+ return null;
+ }
+
+ /**
+ * HBaseKeyFactory implementation providing HBaseCompositeKey as a row key
+ */
+ public static class HBaseCompositeKeyFactory
+ implements HBaseKeyFactory, HiveStoragePredicateHandler, Configurable {
+
+ private final Class keyClass;
+ private final Constructor constructor;
+
+ private Configuration conf;
+ private Properties properties;
+ private SerDeParameters parameters;
+
+ public HBaseCompositeKeyFactory(Class keyClass) throws Exception {
+ // see javadoc of HBaseCompositeKey
+ this.keyClass = keyClass;
+ this.constructor = keyClass.getDeclaredConstructor(
+ LazySimpleStructObjectInspector.class, Properties.class, Configuration.class);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void init(SerDeParameters parameters, Properties properties) {
+ this.parameters = parameters;
+ this.properties = properties;
+ }
+
+ @Override
+ public T createObject(ObjectInspector inspector) throws SerDeException {
+ try {
+ return (T) constructor.newInstance(inspector, properties, conf);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ @Override
+ public ObjectInspector createObjectInspector(TypeInfo type)
+ throws SerDeException {
+ return LazyFactory.createLazyObjectInspector(type,
+ parameters.getSeparators(), 1, parameters.getNullSequence(),
+ parameters.isEscaped(), parameters.getEscapeChar());
+ }
+
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
+ ExprNodeDesc predicate) {
+ try {
+ return createObject(null).decomposePredicate(jobConf, deserializer, predicate);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+ }
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
new file mode 100644
index 0000000..80834eb
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.Properties;
+
+/**
+ * Provides custom implementation of object and object inspector for hbase key.
+ * key object should implement LazyObjectBase
+ */
+public interface HBaseKeyFactory {
+
+ /**
+ * initialize factory with properties
+ */
+ void init(SerDeParameters parameters, Properties properties) throws SerDeException;
+
+ /**
+ * create custom object inspector for hbase key
+ * @param type type information
+ */
+ ObjectInspector createObjectInspector(TypeInfo type) throws SerDeException;
+
+ /**
+ * create custom object for hbase key
+ * @param inspector OI create by {@link HBaseKeyFactory#createObjectInspector}
+ */
+ LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException;
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
new file mode 100644
index 0000000..d466ff6
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Text;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// Does same thing with LazyFactory#createLazyObjectInspector except that this replaces
+// original keyOI with OI which is create by HBaseKeyFactory provided by serde property for hbase
+public class HBaseLazyObjectFactory {
+
+ public static ObjectInspector createLazyHBaseStructInspector(
+ List columnNames, List typeInfos, byte[] separators,
+ Text nullSequence, boolean lastColumnTakesRest, boolean escaped,
+ byte escapeChar, int index, HBaseKeyFactory factory) throws SerDeException {
+ ArrayList columnObjectInspectors = new ArrayList(
+ typeInfos.size());
+ for (int i = 0; i < typeInfos.size(); i++) {
+ if (i == index && factory != null) {
+ columnObjectInspectors.add(factory.createObjectInspector(typeInfos.get(i)));
+ } else {
+ columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(
+ typeInfos.get(i), separators, 1, nullSequence, escaped, escapeChar));
+ }
+ }
+ return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+ columnNames, columnObjectInspectors, separators[0], nullSequence,
+ lastColumnTakesRest, escaped, escapeChar);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java
new file mode 100644
index 0000000..f5d1c27
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HBaseScanRange implements Serializable {
+
+ private byte[] startRow;
+ private byte[] stopRow;
+
+ private List filterDescs = new ArrayList();
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public void setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ public void setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ }
+
+ public void addFilter(Filter filter) throws IOException {
+ filterDescs.add(new FilterDesc(filter.getClass().getName(), filter.toByteArray()));
+ }
+
+ public void setup(Scan scan) throws DeserializationException {
+ if (startRow != null) {
+ scan.setStartRow(startRow);
+ }
+ if (stopRow != null) {
+ scan.setStopRow(stopRow);
+ }
+ if (filterDescs.isEmpty()) {
+ return;
+ }
+ if (filterDescs.size() == 1) {
+ scan.setFilter(filterDescs.get(0).toFilter());
+ return;
+ }
+ List filters = new ArrayList();
+ for (FilterDesc filter : filterDescs) {
+ filters.add(filter.toFilter());
+ }
+ scan.setFilter(new FilterList(filters));
+ }
+
+ public String toString() {
+ return (startRow == null ? "" : new BytesWritable(startRow).toString()) + " ~ " +
+ (stopRow == null ? "" : new BytesWritable(stopRow).toString());
+ }
+
+ private static class FilterDesc implements Serializable {
+
+ private String className;
+ private byte[] binary;
+
+ public FilterDesc(String className, byte[] binary) {
+ this.className = className;
+ this.binary = binary;
+ }
+
+ public Filter toFilter() throws DeserializationException {
+ if (className.endsWith("ValueFilter")) {
+ return ValueFilter.parseFrom(binary);
+ } else if (className.endsWith("TimestampsFilter")) {
+ return TimestampsFilter.parseFrom(binary);
+ } else if (className.endsWith("ColumnRangeFilter")) {
+ return ColumnRangeFilter.parseFrom(binary);
+ } else if (className.endsWith("FamilyFilter")) {
+ return FamilyFilter.parseFrom(binary);
+ }
+ throw new IllegalArgumentException("Invalid type " + className);
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
index 2cd65cb..bfad694 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
@@ -19,20 +19,19 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
@@ -54,6 +53,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* HBaseSerDe can be used to serialize object into an HBase table and
@@ -67,6 +67,7 @@
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
+ public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
@@ -87,8 +88,7 @@
private final ByteStream.Output serializeStream = new ByteStream.Output();
private int iKey;
private long putTimestamp;
- private Class> compositeKeyClass;
- private Object compositeKeyObj;
+ private HBaseKeyFactory keyFactory;
// used for serializing a field
private byte [] separators; // the separators array
@@ -123,22 +123,19 @@ public void initialize(Configuration conf, Properties tbl)
initHBaseSerDeParameters(conf, tbl, getClass().getName());
- cachedObjectInspector = LazyFactory.createLazyStructInspector(
+ cachedObjectInspector = HBaseLazyObjectFactory.createLazyHBaseStructInspector(
serdeParams.getColumnNames(),
serdeParams.getColumnTypes(),
serdeParams.getSeparators(),
serdeParams.getNullSequence(),
serdeParams.isLastColumnTakesRest(),
serdeParams.isEscaped(),
- serdeParams.getEscapeChar());
+ serdeParams.getEscapeChar(),
+ iKey, keyFactory
+ );
cachedHBaseRow = new LazyHBaseRow(
- (LazySimpleStructObjectInspector) cachedObjectInspector);
-
- if (compositeKeyClass != null) {
- // initialize the constructor of the composite key class with its object inspector
- initCompositeKeyClass(conf,tbl);
- }
+ (LazySimpleStructObjectInspector) cachedObjectInspector, iKey, keyFactory);
if (LOG.isDebugEnabled()) {
LOG.debug("HBaseSerDe initialized with : columnNames = "
@@ -440,8 +437,7 @@ public static boolean isRowKeyColumn(String hbaseColumnName) {
return hbaseColumnName.equals(HBASE_KEY_COL);
}
-
- static class ColumnMapping {
+ public static class ColumnMapping {
ColumnMapping() {
binaryStorage = new ArrayList(2);
@@ -469,16 +465,6 @@ private void initHBaseSerDeParameters(
doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
- String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
-
- if (compKeyClass != null) {
- try {
- compositeKeyClass = job.getClassByName(compKeyClass);
- } catch (ClassNotFoundException e) {
- throw new SerDeException(e);
- }
- }
-
// Parse and initialize the HBase columns mapping
columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
@@ -547,6 +533,29 @@ private void initHBaseSerDeParameters(
String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
parseColumnStorageTypes(hbaseTableStorageType);
setKeyColumnOffset();
+
+ try {
+ keyFactory = createKeyFactory(job, tbl);
+ if (keyFactory != null) {
+ keyFactory.init(serdeParams, tbl);
+ }
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tbl) throws Exception {
+ String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY);
+ if (factoryClassName != null) {
+ Class> factoryClazz = Class.forName(factoryClassName);
+ return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job);
+ }
+ String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+ if (keyClassName != null) {
+ Class> keyClass = Class.forName(keyClassName);
+ return new HBaseCompositeKey.HBaseCompositeKeyFactory(keyClass);
+ }
+ return null;
}
/**
@@ -562,7 +571,7 @@ public Object deserialize(Writable result) throws SerDeException {
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}
- cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj);
+ cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping);
return cachedHBaseRow;
}
@@ -692,8 +701,11 @@ public Writable serialize(Object obj, ObjectInspector objInspector)
// the data to JSON string. Otherwise serialize the data in the
// delimited way.
serializeStream.reset();
+
boolean isNotNull;
- if (!foi.getCategory().equals(Category.PRIMITIVE)
+ if (i == iKey && usesCustomWritableKey()) {
+ isNotNull = ((HBaseWritableKeyFactory) keyFactory).serialize(f, foi, serializeStream);
+ } else if (!foi.getCategory().equals(Category.PRIMITIVE)
&& (declaredFields == null ||
declaredFields.get(i).getFieldObjectInspector().getCategory()
.equals(Category.PRIMITIVE) || useJSONSerialize)) {
@@ -722,6 +734,22 @@ public Writable serialize(Object obj, ObjectInspector objInspector)
return null;
}
+ public boolean usesCustomWritableKey() {
+ return keyFactory instanceof HBaseWritableKeyFactory;
+ }
+
+ public boolean usesCustomPredicateAnalyzer() {
+ return keyFactory instanceof HiveStoragePredicateHandler;
+ }
+
+ public HBaseKeyFactory getKeyFactory() {
+ return keyFactory;
+ }
+
+ public SerDeParameters getSerdeParams() {
+ return serdeParams;
+ }
+
/*
* Serialize the row into a ByteStream.
*
@@ -828,48 +856,6 @@ private boolean serialize(
}
/**
- * Initialize the composite key class with the objectinspector for the key
- *
- * @throws SerDeException
- * */
- private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException {
-
- int i = 0;
-
- // find the hbase row key
- for (ColumnMapping colMap : columnsMapping) {
- if (colMap.hbaseRowKey) {
- break;
- }
- i++;
- }
-
- ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector)
- .getAllStructFieldRefs().get(i).getFieldObjectInspector();
-
- try {
- compositeKeyObj = compositeKeyClass.getDeclaredConstructor(
- LazySimpleStructObjectInspector.class, Properties.class, Configuration.class)
- .newInstance(
- ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf);
- } catch (IllegalArgumentException e) {
- throw new SerDeException(e);
- } catch (SecurityException e) {
- throw new SerDeException(e);
- } catch (InstantiationException e) {
- throw new SerDeException(e);
- } catch (IllegalAccessException e) {
- throw new SerDeException(e);
- } catch (InvocationTargetException e) {
- throw new SerDeException(e);
- } catch (NoSuchMethodException e) {
- // the constructor wasn't defined in the implementation class. Flag error
- throw new SerDeException("Constructor not defined in composite key class ["
- + compositeKeyClass.getName() + "]", e);
- }
- }
-
- /**
* @return the useJSONSerialize
*/
public boolean isUseJSONSerialize() {
@@ -918,4 +904,15 @@ public static int getRowKeyColumnOffset(List columnsMapping)
throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
" row key column.");
}
+
+ public static TypeInfo getTypeForName(SerDeParameters parameters, String columnName) {
+ List columnNames = parameters.getColumnNames();
+ for (int i = 0; i < columnNames.size(); i++) {
+ if (columnName.equals(columnNames.get(i))) {
+ return parameters.getColumnTypes().get(i);
+ }
+ }
+ throw new IllegalArgumentException("Invalid column name " + columnName);
+ }
+
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
index 29e5da5..3028348 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
@@ -403,12 +403,17 @@ public DecomposedPredicate decomposePredicate(
Deserializer deserializer,
ExprNodeDesc predicate)
{
+ HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
+ if (hbaseSerde.usesCustomPredicateAnalyzer()) {
+ HiveStoragePredicateHandler handler = (HiveStoragePredicateHandler) hbaseSerde.getKeyFactory();
+ return handler.decomposePredicate(jobConf, deserializer, predicate);
+ }
+
String columnNameProperty = jobConf.get(
org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS);
List columnNames =
Arrays.asList(columnNameProperty.split(","));
- HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
int keyColPos = hbaseSerde.getKeyColumnOffset();
String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES).
split(",")[keyColPos];
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java
new file mode 100644
index 0000000..95b0d71
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseWritableKeyFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.io.IOException;
+
+public interface HBaseWritableKeyFactory extends HBaseKeyFactory {
+
+ /**
+ * serialize hive object in internal format of custom key
+ *
+ * @param object
+ * @param inspector
+ * @param output
+ *
+ * @return true if it's not null
+ * @throws IOException
+ */
+ boolean serialize(Object object, ObjectInspector inspector, ByteStream.Output output) throws IOException;
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 704fcb9..49dea7c 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
@@ -254,6 +255,18 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
throws IOException {
Scan scan = new Scan();
+ String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
+ if (filterObjectSerialized != null) {
+ HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized,
+ HBaseScanRange.class);
+ try {
+ range.setup(scan);
+ } catch (DeserializationException e) {
+ throw new IOException(e);
+ }
+ return scan;
+ }
+
String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return scan;
@@ -325,6 +338,10 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary)
}
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow));
+ }
return scan;
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
index fc40195..27b063e 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
@@ -24,15 +24,14 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* LazyObject for storing an HBase row. The field of an HBase row can be
@@ -45,72 +44,47 @@
*/
private Result result;
private List columnsMapping;
- private Object compositeKeyObj;
private ArrayList