diff --git hbase-handler/pom.xml hbase-handler/pom.xml
index 7c3524c..9aca819 100644
--- hbase-handler/pom.xml
+++ hbase-handler/pom.xml
@@ -222,6 +222,19 @@
${basedir}/src/java
${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
index 5008f15..a92dc36 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java
@@ -18,18 +18,24 @@
package org.apache.hadoop.hive.hbase;
+import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
* HBaseCompositeKey extension of LazyStruct. All complex composite keys should extend this class
@@ -96,4 +102,51 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) {
return lazyObject;
}
+
+ public static class HBaseCompositeKeyFactory implements HBaseKeyFactory, Configurable {
+
+ private final Constructor constructor;
+
+ private Configuration conf;
+ private Properties properties;
+ private SerDeParameters parameters;
+
+ public HBaseCompositeKeyFactory(Class> keyClass) throws Exception {
+ constructor = keyClass.getDeclaredConstructor(
+ LazySimpleStructObjectInspector.class, Properties.class, Configuration.class);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void init(SerDeParameters parameters, Properties properties) {
+ this.parameters = parameters;
+ this.properties = properties;
+ }
+
+ @Override
+ public LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException {
+ try {
+ return (LazyObjectBase) constructor.newInstance(inspector, properties, conf);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ @Override
+ public ObjectInspector createObjectInspector(TypeInfo type)
+ throws SerDeException {
+ return LazyFactory.createLazyObjectInspector(type,
+ parameters.getSeparators(), 1, parameters.getNullSequence(),
+ parameters.isEscaped(), parameters.getEscapeChar());
+ }
+ }
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
new file mode 100644
index 0000000..3c2ba7d
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.Properties;
+
+public interface HBaseKeyFactory {
+
+ void init(SerDeParameters parameters, Properties properties) throws SerDeException;
+
+ ObjectInspector createObjectInspector(TypeInfo type) throws SerDeException;
+
+ LazyObjectBase createObject(ObjectInspector inspector) throws SerDeException;
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
new file mode 100644
index 0000000..62ce73d
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseLazyObjectFactory.java
@@ -0,0 +1,33 @@
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Text;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HBaseLazyObjectFactory {
+
+ public static ObjectInspector createLazyHBaseStructInspector(
+ List columnNames, List typeInfos, byte[] separators,
+ Text nullSequence, boolean lastColumnTakesRest, boolean escaped,
+ byte escapeChar, int index, HBaseKeyFactory factory) throws SerDeException {
+ ArrayList columnObjectInspectors = new ArrayList(
+ typeInfos.size());
+ for (int i = 0; i < typeInfos.size(); i++) {
+ if (i == index && factory != null) {
+ columnObjectInspectors.add(factory.createObjectInspector(typeInfos.get(i)));
+ } else {
+ columnObjectInspectors.add(LazyFactory.createLazyObjectInspector(
+ typeInfos.get(i), separators, 1, nullSequence, escaped, escapeChar));
+ }
+ }
+ return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+ columnNames, columnObjectInspectors, separators[0], nullSequence,
+ lastColumnTakesRest, escaped, escapeChar);
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
index 2cd65cb..fcccd35 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -32,7 +31,6 @@
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
@@ -54,6 +52,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* HBaseSerDe can be used to serialize object into an HBase table and
@@ -67,6 +66,7 @@
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
+ public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
@@ -87,8 +87,7 @@
private final ByteStream.Output serializeStream = new ByteStream.Output();
private int iKey;
private long putTimestamp;
- private Class> compositeKeyClass;
- private Object compositeKeyObj;
+ private HBaseKeyFactory keyFactory;
// used for serializing a field
private byte [] separators; // the separators array
@@ -123,22 +122,19 @@ public void initialize(Configuration conf, Properties tbl)
initHBaseSerDeParameters(conf, tbl, getClass().getName());
- cachedObjectInspector = LazyFactory.createLazyStructInspector(
+ cachedObjectInspector = HBaseLazyObjectFactory.createLazyHBaseStructInspector(
serdeParams.getColumnNames(),
serdeParams.getColumnTypes(),
serdeParams.getSeparators(),
serdeParams.getNullSequence(),
serdeParams.isLastColumnTakesRest(),
serdeParams.isEscaped(),
- serdeParams.getEscapeChar());
+ serdeParams.getEscapeChar(),
+ iKey, keyFactory
+ );
cachedHBaseRow = new LazyHBaseRow(
- (LazySimpleStructObjectInspector) cachedObjectInspector);
-
- if (compositeKeyClass != null) {
- // initialize the constructor of the composite key class with its object inspector
- initCompositeKeyClass(conf,tbl);
- }
+ (LazySimpleStructObjectInspector) cachedObjectInspector, iKey, keyFactory);
if (LOG.isDebugEnabled()) {
LOG.debug("HBaseSerDe initialized with : columnNames = "
@@ -469,16 +465,6 @@ private void initHBaseSerDeParameters(
doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
- String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
-
- if (compKeyClass != null) {
- try {
- compositeKeyClass = job.getClassByName(compKeyClass);
- } catch (ClassNotFoundException e) {
- throw new SerDeException(e);
- }
- }
-
// Parse and initialize the HBase columns mapping
columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
@@ -547,6 +533,29 @@ private void initHBaseSerDeParameters(
String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
parseColumnStorageTypes(hbaseTableStorageType);
setKeyColumnOffset();
+
+ try {
+ keyFactory = createKeyFactory(job, tbl);
+ if (keyFactory != null) {
+ keyFactory.init(serdeParams, tbl);
+ }
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ private HBaseKeyFactory createKeyFactory(Configuration job, Properties tbl) throws Exception {
+ String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY);
+ if (factoryClassName != null) {
+ Class> factoryClazz = Class.forName(factoryClassName);
+ return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job);
+ }
+ String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+ if (keyClassName != null) {
+ Class> keyClass = Class.forName(keyClassName);
+ return new HBaseCompositeKey.HBaseCompositeKeyFactory(keyClass);
+ }
+ return null;
}
/**
@@ -562,7 +571,7 @@ public Object deserialize(Writable result) throws SerDeException {
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}
- cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj);
+ cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping);
return cachedHBaseRow;
}
@@ -828,48 +837,6 @@ private boolean serialize(
}
/**
- * Initialize the composite key class with the objectinspector for the key
- *
- * @throws SerDeException
- * */
- private void initCompositeKeyClass(Configuration conf,Properties tbl) throws SerDeException {
-
- int i = 0;
-
- // find the hbase row key
- for (ColumnMapping colMap : columnsMapping) {
- if (colMap.hbaseRowKey) {
- break;
- }
- i++;
- }
-
- ObjectInspector keyObjectInspector = ((LazySimpleStructObjectInspector) cachedObjectInspector)
- .getAllStructFieldRefs().get(i).getFieldObjectInspector();
-
- try {
- compositeKeyObj = compositeKeyClass.getDeclaredConstructor(
- LazySimpleStructObjectInspector.class, Properties.class, Configuration.class)
- .newInstance(
- ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf);
- } catch (IllegalArgumentException e) {
- throw new SerDeException(e);
- } catch (SecurityException e) {
- throw new SerDeException(e);
- } catch (InstantiationException e) {
- throw new SerDeException(e);
- } catch (IllegalAccessException e) {
- throw new SerDeException(e);
- } catch (InvocationTargetException e) {
- throw new SerDeException(e);
- } catch (NoSuchMethodException e) {
- // the constructor wasn't defined in the implementation class. Flag error
- throw new SerDeException("Constructor not defined in composite key class ["
- + compositeKeyClass.getName() + "]", e);
- }
- }
-
- /**
* @return the useJSONSerialize
*/
public boolean isUseJSONSerialize() {
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
index 8cd594b..bf3b6e7 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
@@ -34,8 +34,9 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableInputFormatBase;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
@@ -376,10 +377,19 @@ private void addHBaseDelegationToken(Configuration conf) throws IOException {
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
try {
- TableMapReduceUtil.addDependencyJars(jobConf);
- org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
- HBaseStorageHandler.class, org.apache.hadoop.hbase.HBaseConfiguration.class);
- } catch (IOException e) {
+ TableMapReduceUtil.addDependencyJars(jobConf,
+ HBaseStorageHandler.class, TableInputFormatBase.class,
+ // explicitly pull a class from each module
+ org.apache.hadoop.hbase.HConstants.class, // hbase-common
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
+ org.apache.hadoop.hbase.client.Put.class, // hbase-client
+ org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
+ // pull necessary dependencies
+ org.apache.zookeeper.ZooKeeper.class,
+ org.jboss.netty.channel.ChannelFactory.class,
+ com.google.protobuf.Message.class,
+ com.google.common.collect.Lists.class,
+ org.cloudera.htrace.Trace.class); } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
index fc40195..27b063e 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
@@ -24,15 +24,14 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* LazyObject for storing an HBase row. The field of an HBase row can be
@@ -45,72 +44,47 @@
*/
private Result result;
private List columnsMapping;
- private Object compositeKeyObj;
private ArrayList