diff --git hbase-handler/build.xml hbase-handler/build.xml
new file mode 100644
index 0000000..0980db2
--- /dev/null
+++ hbase-handler/build.xml
@@ -0,0 +1,82 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
index 9cae5d3..2468978 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java
@@ -305,8 +305,9 @@ public int getKeyIndex() {
String qualifierPrefix;
byte[] qualifierPrefixBytes;
- public boolean isCategory(ObjectInspector.Category category) {
- return columnType.getCategory() == category;
+ public boolean convertToJson(ObjectInspector input) {
+ return columnType.getCategory() == ObjectInspector.Category.PRIMITIVE &&
+ input.getCategory() != ObjectInspector.Category.PRIMITIVE;
}
}
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
index 5731e45..5f4761a 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java
@@ -54,6 +54,6 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException
@Override
public byte[] serializeKey(Object object, StructField field) throws IOException {
- return serializer.serializeKeyField(object, field, keyMapping);
+ return serializer.serializeKeyField(object, field);
}
}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java
new file mode 100644
index 0000000..160a7e9
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+public class HBaseExportSerDe extends AbstractSerDe {
+
+ private HBaseSerDe serde;
+ private HBaseSerDeParameters serdeParam;
+ private HBaseUnionWritable dummy;
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ serde = new HBaseSerDe();
+ serde.initialize(conf, tbl);
+ serdeParam = serde.getHBaseSerdeParam();
+ dummy = new HBaseUnionWritable(serde.getSerializer(), serdeParam.getKeyIndex());
+ }
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return HBaseUnionWritable.class;
+ }
+
+ StructObjectInspector inputOI;
+ StructField[] fields;
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ if (inputOI == null) {
+ inputOI = (StructObjectInspector) objInspector;
+ fields = inputOI.getAllStructFieldRefs().toArray(new StructField[0]);
+ dummy.rowKeyField = fields[0];
+ dummy.unionOI = (UnionObjectInspector) fields[1].getFieldObjectInspector();
+ }
+ dummy.rowKey = inputOI.getStructFieldData(obj, fields[0]);
+ dummy.union = inputOI.getStructFieldData(obj, fields[1]);
+ return dummy;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return serde.getSerDeStats();
+ }
+
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ return serde.deserialize(blob);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ int iKey = serdeParam.getKeyIndex();
+ StructObjectInspector output = (StructObjectInspector) serde.getObjectInspector();
+ List extends StructField> fields = output.getAllStructFieldRefs();
+ List unionOIs = new ArrayList();
+ for (int i = 0; i < fields.size(); i++) {
+ if (i != iKey) {
+ unionOIs.add(fields.get(i).getFieldObjectInspector());
+ }
+ }
+ ObjectInspector unionOI = ObjectInspectorFactory.getStandardUnionObjectInspector(unionOIs);
+ return ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList("key", "values"),
+ Arrays.asList(fields.get(iKey).getFieldObjectInspector(), unionOI));
+
+ }
+
+ public static class HBaseUnionWritable implements Writable {
+
+ private Object rowKey;
+ private StructField rowKeyField;
+
+ private Object union;
+ private UnionObjectInspector unionOI;
+
+ private final HBaseRowSerializer serializer;
+ private final int iKey;
+ private final byte[][] holder = new byte[3][]; // family | qualifier | data
+
+ public HBaseUnionWritable(HBaseRowSerializer serializer, int iKey) {
+ this.serializer = serializer;
+ this.iKey = iKey;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("write");
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("readFields");
+ }
+
+ public Object getField() {
+ return unionOI.getField(union);
+ }
+
+ public byte getTag() {
+ return unionOI.getTag(union);
+ }
+
+ private int getColIndex() {
+ byte tag = getTag();
+ return tag < iKey ? tag : tag + 1;
+ }
+
+ public KeyValue toKeyValue() throws IOException {
+ byte[] key = serializer.serializeKey(rowKey, rowKeyField);
+ ObjectInspector fieldOI = unionOI.getObjectInspectors().get(getTag());
+ serializer.serializeField(getColIndex(), getField(), fieldOI, holder);
+ return new KeyValue(key, holder[0], holder[1], holder[2]);
+ }
+ }
+}
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
index fe6081e..270b648 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java
@@ -86,7 +86,7 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep
StructField field = fields.get(keyIndex);
Object value = list.get(keyIndex);
- byte[] key = keyFactory.serializeKey(value, field);
+ byte[] key = serializeKey(value, field);
if (key == null) {
throw new SerDeException("HBase row key cannot be NULL");
}
@@ -106,15 +106,17 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep
return new PutWritable(put);
}
- byte[] serializeKeyField(Object keyValue, StructField keyField, ColumnMapping keyMapping)
- throws IOException {
+ byte[] serializeKey(Object keyValue, StructField keyField) throws IOException {
+ return keyFactory.serializeKey(keyValue, keyField);
+ }
+
+ byte[] serializeKeyField(Object keyValue, StructField keyField) throws IOException {
if (keyValue == null) {
throw new IOException("HBase row key cannot be NULL");
}
ObjectInspector keyFieldOI = keyField.getFieldObjectInspector();
- if (!keyFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE) &&
- keyMapping.isCategory(ObjectInspector.Category.PRIMITIVE)) {
+ if (keyMapping.convertToJson(keyFieldOI)) {
// we always serialize the String type using the escaped algorithm for LazyString
return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI),
PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false);
@@ -125,6 +127,21 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep
return serialize(keyValue, keyFieldOI, 1, writeBinary);
}
+ byte[][] serializeField(int index, Object o, ObjectInspector oi, byte[][] holder)
+ throws IOException {
+ ColumnMapping colMap = columnMappings[index];
+ boolean writeBinary = colMap.binaryStorage.get(0);
+ if (colMap.convertToJson(oi)) {
+ o = SerDeUtils.getJSONString(o, oi);
+ oi = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+ writeBinary = false;
+ }
+ holder[0] = colMap.familyNameBytes;
+ holder[1] = colMap.qualifierNameBytes;
+ holder[2] = serialize(o, oi, 1, writeBinary);
+ return holder;
+ }
+
private void serializeField(
Object value, StructField field, ColumnMapping colMap, Put put) throws IOException {
if (value == null) {
@@ -168,8 +185,7 @@ private void serializeField(
// the field is declared as a primitive in initialization, serialize
// the data to JSON string. Otherwise serialize the data in the
// delimited way.
- if (!foi.getCategory().equals(ObjectInspector.Category.PRIMITIVE)
- && colMap.isCategory(ObjectInspector.Category.PRIMITIVE)) {
+ if (colMap.convertToJson(foi)) {
// we always serialize the String type using the escaped algorithm for LazyString
bytes = serialize(SerDeUtils.getJSONString(value, foi),
PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false);
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
index 6c1ce5c..5afae78 100644
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
@@ -205,6 +205,10 @@ public HBaseSerDeParameters getHBaseSerdeParam() {
return serdeParams;
}
+ public HBaseRowSerializer getSerializer() {
+ return serializer;
+ }
+
/**
* Deserialize a row from the HBase Result writable to a LazyObject
* @param result the HBase Result Writable containing the row
diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java
new file mode 100644
index 0000000..360fbac
--- /dev/null
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputCommitter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.util.Progressable;
+
+public class HiveHFileExporter extends HFileOutputFormat implements
+ HiveOutputFormat, HiveOutputCommitter {
+
+ static final Log LOG = LogFactory.getLog(HiveHFileExporter.class.getName());
+
+ private org.apache.hadoop.mapreduce.RecordWriter
+ getFileWriter(org.apache.hadoop.mapreduce.TaskAttemptContext tac)
+ throws IOException {
+ try {
+ return super.getRecordWriter(tac);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public RecordWriter getHiveRecordWriter(
+ final JobConf jc,
+ final Path finalOutPath,
+ Class extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ final Progressable progressable) throws IOException {
+
+ final Job job = new Job(jc);
+
+ setCompressOutput(job, isCompressed);
+ setOutputPath(job, finalOutPath);
+
+ // Create the HFile writer
+ final org.apache.hadoop.mapreduce.TaskAttemptContext tac =
+ ShimLoader.getHadoopShims().newTaskAttemptContext(
+ job.getConfiguration(), progressable);
+
+ final JobContext jctx = ShimLoader.getHadoopShims().newJobContext(job);
+ jctx.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+ final org.apache.hadoop.mapreduce.RecordWriter<
+ ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac);
+
+ return new RecordWriter() {
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ try {
+ fileWriter.close(null);
+ if (!abort) {
+ FileOutputCommitter committer = new FileOutputCommitter(finalOutPath, tac);
+ committer.commitTask(tac);
+ committer.commitJob(jctx);
+ }
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ // Decompose the incoming text row into fields.
+ HBaseExportSerDe.HBaseUnionWritable handover = (HBaseExportSerDe.HBaseUnionWritable) w;
+ KeyValue kv = handover.toKeyValue();
+ try {
+ fileWriter.write(null, kv);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+ };
+ }
+
+ private transient boolean prev;
+
+ public void commit(HiveConf conf, Path path) throws HiveException {
+ prev = HiveConf.getBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES);
+ Set created = new HashSet();
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ for (FileStatus status : fs.listStatus(path)) {
+ if (!status.isDir()) {
+ continue;
+ }
+ for (FileStatus family : fs.listStatus(status.getPath())) {
+ Path source = family.getPath();
+ Path target = new Path(path, source.getName());
+ if (created.add(target)) {
+ fs.mkdirs(target);
+ }
+ for (FileStatus region : fs.listStatus(source)) {
+ Path regionPath = region.getPath();
+ fs.rename(regionPath, new Path(target, regionPath.getName()));
+ }
+ }
+ fs.delete(status.getPath(), true);
+ }
+ HiveConf.setBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void completed(HiveConf conf) {
+ HiveConf.setBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, prev);
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf jc) throws IOException {
+ Job job = new Job(jc);
+ checkOutputSpecs(ShimLoader.getHadoopShims().newJobContext(job));
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
+ throw new NotImplementedException("This will not be invoked");
+ }
+}
diff --git hbase-handler/src/test/queries/positive/hbase_bulk2.m hbase-handler/src/test/queries/positive/hbase_bulk2.m
new file mode 100644
index 0000000..4c52831
--- /dev/null
+++ hbase-handler/src/test/queries/positive/hbase_bulk2.m
@@ -0,0 +1,33 @@
+DROP table hbase_export;
+
+CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key")
+STORED AS
+ INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter'
+LOCATION '/tmp/export';
+
+set mapred.reduce.tasks=4;
+set hive.optimize.sampling.orderby=true;
+
+INSERT OVERWRITE TABLE hbase_export
+SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values;
+
+dfs -count /tmp/export;
+dfs -lsr /tmp/export;
+
+-- QTestUtil masks something like this (validated by HFile.Writer itself)
+-- 3 8 40392 hdfs://localhost.localdomain:50049/tmp/export
+-- drwxr-xr-x - navis supergroup 0 2013-06-24 11:08 /tmp/export/cf1
+-- -rw-r--r-- 1 navis supergroup 4274 2013-06-24 11:08 /tmp/export/cf1/62839fa5a60d40aab210b4159fc94428
+-- -rw-r--r-- 1 navis supergroup 6324 2013-06-24 11:08 /tmp/export/cf1/961670eae78649caa777d1adce381802
+-- -rw-r--r-- 1 navis supergroup 4977 2013-06-24 11:08 /tmp/export/cf1/d618fd415c4a42d8b980e55ad0accdf1
+-- -rw-r--r-- 1 navis supergroup 4113 2013-06-24 11:08 /tmp/export/cf1/f90bbc7d0b3741689c0385692516c94d
+-- drwxr-xr-x - navis supergroup 0 2013-06-24 11:08 /tmp/export/cf2
+-- -rw-r--r-- 1 navis supergroup 4323 2013-06-24 11:08 /tmp/export/cf2/0b85ebc7f5974d81a2b321ac2a87569f
+-- -rw-r--r-- 1 navis supergroup 6732 2013-06-24 11:08 /tmp/export/cf2/38765a24c54c49e38a6033f4f2c951c1
+-- -rw-r--r-- 1 navis supergroup 5200 2013-06-24 11:08 /tmp/export/cf2/57a12a447137484b828165842c3da0f8
+-- -rw-r--r-- 1 navis supergroup 4449 2013-06-24 11:08 /tmp/export/cf2/c5471ba48d3d400e82f022c893e62c80
+
+desc formatted hbase_export;
diff --git hbase-handler/src/test/results/positive/hbase_bulk2.m.out hbase-handler/src/test/results/positive/hbase_bulk2.m.out
new file mode 100644
index 0000000..f97f7fe
--- /dev/null
+++ hbase-handler/src/test/results/positive/hbase_bulk2.m.out
@@ -0,0 +1,82 @@
+PREHOOK: query: DROP table hbase_export
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP table hbase_export
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key")
+STORED AS
+ INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter'
+#### A masked pattern was here ####
+PREHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key")
+STORED AS
+ INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter'
+#### A masked pattern was here ####
+POSTHOOK: type: CREATETABLE
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_export
+PREHOOK: query: INSERT OVERWRITE TABLE hbase_export
+SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_export
+POSTHOOK: query: INSERT OVERWRITE TABLE hbase_export
+SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_export
+POSTHOOK: Lineage: hbase_export.key SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbase_export.values SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+#### A masked pattern was here ####
+PREHOOK: query: -- QTestUtil masks something like this (validated by HFile.Writer itself)
+#### A masked pattern was here ####
+
+desc formatted hbase_export
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@hbase_export
+POSTHOOK: query: -- QTestUtil masks something like this (validated by HFile.Writer itself)
+#### A masked pattern was here ####
+
+desc formatted hbase_export
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@hbase_export
+# col_name data_type comment
+
+key string from deserializer
+values uniontype from deserializer
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Protect Mode: None
+Retention: 0
+#### A masked pattern was here ####
+Table Type: EXTERNAL_TABLE
+Table Parameters:
+ COLUMN_STATS_ACCURATE true
+ EXTERNAL TRUE
+ numFiles 0
+ numRows 0
+ rawDataSize 0
+ totalSize 0
+#### A masked pattern was here ####
+
+# Storage Information
+SerDe Library: org.apache.hadoop.hive.hbase.HBaseExportSerDe
+InputFormat: org.apache.hadoop.mapred.TextInputFormat
+OutputFormat: org.apache.hadoop.hive.hbase.HiveHFileExporter
+Compressed: No
+Num Buckets: -1
+Bucket Columns: []
+Sort Columns: []
+Storage Desc Params:
+ hbase.columns.mapping cf2:value,:key,cf1:key
+ serialization.format 1
diff --git itests/qtest/pom.xml itests/qtest/pom.xml
index 249956f..0d646c2 100644
--- itests/qtest/pom.xml
+++ itests/qtest/pom.xml
@@ -552,7 +552,7 @@
outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/"
templatePath="${basedir}/${hive.path.to.root}/hbase-handler/src/test/templates/" template="TestHBaseCliDriver.vm"
queryDirectory="${basedir}/${hive.path.to.root}/hbase-handler/src/test/queries/positive/"
- queryFile="hbase_bulk.m"
+ queryFile="hbase_bulk.m,hbase_bulk2.m"
runDisabled="${run_disabled}"
clusterMode="miniMR"
resultsDirectory="${basedir}/${hive.path.to.root}/hbase-handler/src/test/results/positive/" className="TestHBaseMinimrCliDriver"
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index a80feb9..69d5233 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -450,6 +450,8 @@
registerTableFunction(NOOP_STREAMING_MAP_TABLE_FUNCTION, NoopWithMapStreamingResolver.class);
registerTableFunction(WINDOWING_TABLE_FUNCTION, WindowingTableFunctionResolver.class);
registerTableFunction("matchpath", MatchPathResolver.class);
+
+ registerGenericUDTF("hfile_kv", HFileKeyValue.class);
}
public static void registerTemporaryUDF(String functionName,
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java
new file mode 100644
index 0000000..c563302
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public interface HiveOutputCommitter {
+
+ void commit(HiveConf conf, Path path) throws HiveException;
+
+ void completed(HiveConf conf);
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index fea1e47..cb3510e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -58,7 +58,6 @@
import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -94,6 +93,8 @@
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.io.HiveOutputCommitter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -1253,6 +1254,11 @@ public void loadPartition(Path loadPath, String tableName,
newPartPath = oldPartPath;
}
+ HiveOutputCommitter committer = getCommitter(tbl.getOutputFormatClass());
+ if (committer != null) {
+ committer.commit(conf, loadPath);
+ }
+
if (replace) {
Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf(),
isSrcLocal);
@@ -1279,6 +1285,9 @@ public void loadPartition(Path loadPath, String tableName,
newCreatedTpart = newTPart.getTPartition();
}
}
+ if (committer != null) {
+ committer.completed(conf);
+ }
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
@@ -1476,6 +1485,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
public void loadTable(Path loadPath, String tableName, boolean replace,
boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException {
Table tbl = getTable(tableName);
+ HiveOutputCommitter committer = getCommitter(tbl.getOutputFormatClass());
+ if (committer != null) {
+ committer.commit(conf, loadPath);
+ }
+
if (replace) {
tbl.replaceFiles(loadPath, isSrcLocal);
} else {
@@ -1503,6 +1517,21 @@ public void loadTable(Path loadPath, String tableName, boolean replace,
throw new HiveException(e);
}
}
+ if (committer != null) {
+ committer.completed(conf);
+ }
+ }
+
+ private HiveOutputCommitter getCommitter(Class extends HiveOutputFormat> output)
+ throws HiveException {
+ if (HiveOutputCommitter.class.isAssignableFrom(output)) {
+ try {
+ return (HiveOutputCommitter) output.newInstance();
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ return null;
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java
new file mode 100644
index 0000000..a9af34a
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+
+@Description(name = "hfile_kv",
+ value = "_FUNC_(a,b,c..,'hbase column mapping') - makes key value pair for each values")
+ public class HFileKeyValue extends GenericUDTF {
+
+ private int keyIndex;
+ private byte[] tags;
+ private final StandardUnion union = new StandardUnion();
+ private final Object[] forward = new Object[] { null, union };
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
+ if (argOIs.length < 3) {
+ throw new UDFArgumentException("hfile_union should have at least three arguments");
+ }
+ ObjectInspector mapOI = argOIs[argOIs.length - 1];
+ if (!(mapOI instanceof ConstantObjectInspector && mapOI instanceof StringObjectInspector)) {
+ throw new UDFArgumentException("The last argument should be a constant string");
+ }
+ String mapping = String.valueOf(((ConstantObjectInspector) mapOI).getWritableConstantValue());
+ String[] splits = mapping.split(",");
+ if (splits.length != argOIs.length - 1) {
+ throw new UDFArgumentException("The number of columns is not matching " +
+ "with column mapping information");
+ }
+ int numValues = splits.length - 1;
+
+ ColumnIndex[] valCols = new ColumnIndex[numValues];
+ ObjectInspector[] valOIs = new ObjectInspector[numValues];
+
+ byte tag = 0;
+ for (int i = 0; i < argOIs.length - 1; i++) {
+ int sharp = splits[i].indexOf('#');
+ if (sharp > 0) {
+ splits[i] = splits[i].substring(0, sharp);
+ }
+ if (splits[i].equals(":key")) {
+ keyIndex = i;
+ continue;
+ }
+ valOIs[tag] = argOIs[i];
+ valCols[tag] = new ColumnIndex(splits[i], tag++);
+ }
+ Arrays.sort(valCols);
+ byte[] tags = new byte[valCols.length];
+ for (int i = 0; i < tags.length; i++) {
+ tags[i] = valCols[i].tag;
+ }
+ this.tags = tags;
+
+ ObjectInspector unionOI = ObjectInspectorFactory.getStandardUnionObjectInspector(
+ Arrays.asList(valOIs));
+ return ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList("key", "values"), Arrays.asList(argOIs[keyIndex], unionOI));
+ }
+
+ @Override
+ public void process(Object[] args) throws HiveException {
+ forward[0] = args[keyIndex];
+ for (int i = 0; i < tags.length; i++) {
+ union.setObject(args[i]);
+ union.setTag(tags[i]);
+ forward(forward);
+ }
+ }
+
+ @Override
+ public void close() throws HiveException {
+ }
+
+ private static class ColumnIndex implements Comparable {
+
+ private final String columnName;
+ private final byte tag;
+
+ public ColumnIndex(String columnName, byte tag) {
+ this.columnName = columnName;
+ this.tag = tag;
+ }
+
+ public int compareTo(ColumnIndex o) {
+ return columnName.compareTo(o.columnName);
+ }
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java
index cae4faa..5df1aad 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java
@@ -104,6 +104,8 @@ public static LazyBinaryObject createLazyBinaryObject(ObjectInspector oi) {
return new LazyBinaryMap((LazyBinaryMapObjectInspector) oi);
case LIST:
return new LazyBinaryArray((LazyBinaryListObjectInspector) oi);
+ case UNION:
+ return new LazyBinaryUnion((LazyBinaryUnionObjectInspector) oi);
case STRUCT:
return new LazyBinaryStruct((LazyBinaryStructObjectInspector) oi);
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java
index 1c8f795..b56912e 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java
@@ -27,12 +27,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.Decimal128;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -44,10 +41,11 @@
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
@@ -55,7 +53,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
@@ -544,6 +541,31 @@ public static void serialize(RandomAccessOutput byteStream, Object obj,
}
return;
}
+ case UNION: {
+ int byteSizeStart = 0;
+ int unionStart = 0;
+ if (!skipLengthPrefix) {
+ // 1/ reserve spaces for the byte size of the struct
+ // which is a integer and takes four bytes
+ byteSizeStart = byteStream.getLength();
+ byteStream.reserve(4);
+ unionStart = byteStream.getLength();
+ }
+ UnionObjectInspector uoi = (UnionObjectInspector) objInspector;
+ byte tag = uoi.getTag(obj);
+ ObjectInspector foi = uoi.getObjectInspectors().get(tag);
+
+ byteStream.write(tag);
+ serialize(byteStream, uoi.getField(obj), foi, false, warnedOnceNullMapKey);
+
+ if (!skipLengthPrefix) {
+ // 5/ update the byte size of the map
+ int unionEnd = byteStream.getLength();
+ int unionSize = unionEnd - unionStart;
+ writeSizeAtOffset(byteStream, byteSizeStart, unionSize);
+ }
+ return;
+ }
case STRUCT: {
int byteSizeStart = 0;
int structStart = 0;
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java
new file mode 100644
index 0000000..704a802
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.lazybinary;
+
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+public class LazyBinaryUnion extends LazyBinaryNonPrimitive {
+
+ private byte tag = -1;
+ private boolean initialized;
+ private LazyBinaryObject field;
+
+ public LazyBinaryUnion(LazyBinaryUnionObjectInspector oi) {
+ super(oi);
+ }
+
+ @Override
+ public void init(ByteArrayRef bytes, int start, int length) {
+ super.init(bytes, start, length);
+ initialized = false;
+ tag = -1;
+ }
+
+ public byte getTag() {
+ if (tag == -1) {
+ parse();
+ }
+ return tag;
+ }
+
+ public Object getField() {
+ if (tag == -1) {
+ parse();
+ }
+ if (!initialized && field != null) {
+ field.init(bytes, start + 1, length - 1);
+ }
+ initialized = true;
+ return field == null ? null : field.getObject();
+ }
+
+ private void parse() {
+ byte[] data = bytes.getData();
+ ObjectInspector fieldOI = oi.getObjectInspectors().get(tag = data[start]);
+ field = LazyBinaryFactory.createLazyBinaryObject(fieldOI);
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java
new file mode 100644
index 0000000..d032823
--- /dev/null
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.lazybinary;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+
+public class LazyBinaryUnionObjectInspector extends StandardUnionObjectInspector {
+ public LazyBinaryUnionObjectInspector(List ois) {
+ super(ois);
+ }
+
+ @Override
+ public byte getTag(Object o) {
+ return o == null ? -1 : ((LazyBinaryUnion) o).getTag();
+ }
+
+ @Override
+ public Object getField(Object o) {
+ return o == null ? null : ((LazyBinaryUnion) o).getField();
+ }
+}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
index f7cfb36..fd6320a 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -234,6 +235,7 @@ public static void checkObjectByteInfo(ObjectInspector objectInspector,
case LIST:
case MAP:
case STRUCT:
+ case UNION:
recordInfo.elementOffset = 4;
recordInfo.elementSize = LazyBinaryUtils.byteArrayToInt(bytes, offset);
break;
@@ -465,6 +467,19 @@ public static ObjectInspector getLazyBinaryObjectInspectorFromTypeInfo(
valueObjectInspector);
break;
}
+ case UNION: {
+ UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ List types = unionTypeInfo.getAllUnionObjectTypeInfos();
+ List fieldObjectInspectors = new ArrayList(types.size());
+ for (int i = 0; i < types.size(); i++) {
+ fieldObjectInspectors
+ .add(getLazyBinaryObjectInspectorFromTypeInfo(types
+ .get(i)));
+ }
+ result = LazyBinaryObjectInspectorFactory.
+ getLazyBinaryUnionObjectInspector(fieldObjectInspectors);
+ break;
+ }
case STRUCT: {
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List fieldNames = structTypeInfo.getAllStructFieldNames();
diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java
index b3ec24d..802ef2c 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
@@ -101,6 +102,22 @@ public static LazyBinaryMapObjectInspector getLazyBinaryMapObjectInspector(
return result;
}
+ static ConcurrentHashMap, LazyBinaryUnionObjectInspector>
+ cachedLazyBinaryUnionObjectInspector =
+ new ConcurrentHashMap, LazyBinaryUnionObjectInspector>();
+
+ public static LazyBinaryUnionObjectInspector getLazyBinaryUnionObjectInspector(
+ List elementsOI) {
+ ArrayList