diff --git hbase-handler/build.xml hbase-handler/build.xml new file mode 100644 index 0000000..0980db2 --- /dev/null +++ hbase-handler/build.xml @@ -0,0 +1,82 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java index 9cae5d3..2468978 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java @@ -305,8 +305,9 @@ public int getKeyIndex() { String qualifierPrefix; byte[] qualifierPrefixBytes; - public boolean isCategory(ObjectInspector.Category category) { - return columnType.getCategory() == category; + public boolean convertToJson(ObjectInspector input) { + return columnType.getCategory() == ObjectInspector.Category.PRIMITIVE && + input.getCategory() != ObjectInspector.Category.PRIMITIVE; } } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java index 5731e45..5f4761a 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java @@ -54,6 +54,6 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException @Override public byte[] serializeKey(Object object, StructField field) throws IOException { - return serializer.serializeKeyField(object, field, keyMapping); + return serializer.serializeKeyField(object, field); } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java new file mode 100644 index 0000000..160a7e9 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseExportSerDe.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.io.Writable; + +public class HBaseExportSerDe extends AbstractSerDe { + + private HBaseSerDe serde; + private HBaseSerDeParameters serdeParam; + private HBaseUnionWritable dummy; + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + serde = new HBaseSerDe(); + serde.initialize(conf, tbl); + serdeParam = serde.getHBaseSerdeParam(); + dummy = new HBaseUnionWritable(serde.getSerializer(), serdeParam.getKeyIndex()); + } + + @Override + public Class getSerializedClass() { + return HBaseUnionWritable.class; + } + + StructObjectInspector inputOI; + StructField[] fields; + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (inputOI == null) { + inputOI = (StructObjectInspector) objInspector; + fields = inputOI.getAllStructFieldRefs().toArray(new StructField[0]); + dummy.rowKeyField = fields[0]; + dummy.unionOI = (UnionObjectInspector) fields[1].getFieldObjectInspector(); + } + dummy.rowKey = inputOI.getStructFieldData(obj, fields[0]); + dummy.union = inputOI.getStructFieldData(obj, fields[1]); + return dummy; + } + + @Override + public SerDeStats getSerDeStats() { + return serde.getSerDeStats(); + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + return serde.deserialize(blob); + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + int iKey = serdeParam.getKeyIndex(); + StructObjectInspector output = (StructObjectInspector) serde.getObjectInspector(); + List fields = output.getAllStructFieldRefs(); + List unionOIs = new ArrayList(); + for (int i = 0; i < fields.size(); i++) { + if (i != iKey) { + unionOIs.add(fields.get(i).getFieldObjectInspector()); + } + } + ObjectInspector unionOI = ObjectInspectorFactory.getStandardUnionObjectInspector(unionOIs); + return ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList("key", "values"), + Arrays.asList(fields.get(iKey).getFieldObjectInspector(), unionOI)); + + } + + public static class HBaseUnionWritable implements Writable { + + private Object rowKey; + private StructField rowKeyField; + + private Object union; + private UnionObjectInspector unionOI; + + private final HBaseRowSerializer serializer; + private final int iKey; + private final byte[][] holder = new byte[3][]; // family | qualifier | data + + public HBaseUnionWritable(HBaseRowSerializer serializer, int iKey) { + this.serializer = serializer; + this.iKey = iKey; + } + + public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("write"); + } + + public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("readFields"); + } + + public Object getField() { + return unionOI.getField(union); + } + + public byte getTag() { + return unionOI.getTag(union); + } + + private int getColIndex() { + byte tag = getTag(); + return tag < iKey ? tag : tag + 1; + } + + public KeyValue toKeyValue() throws IOException { + byte[] key = serializer.serializeKey(rowKey, rowKeyField); + ObjectInspector fieldOI = unionOI.getObjectInspectors().get(getTag()); + serializer.serializeField(getColIndex(), getField(), fieldOI, holder); + return new KeyValue(key, holder[0], holder[1], holder[2]); + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java index fe6081e..270b648 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java @@ -86,7 +86,7 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep StructField field = fields.get(keyIndex); Object value = list.get(keyIndex); - byte[] key = keyFactory.serializeKey(value, field); + byte[] key = serializeKey(value, field); if (key == null) { throw new SerDeException("HBase row key cannot be NULL"); } @@ -106,15 +106,17 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep return new PutWritable(put); } - byte[] serializeKeyField(Object keyValue, StructField keyField, ColumnMapping keyMapping) - throws IOException { + byte[] serializeKey(Object keyValue, StructField keyField) throws IOException { + return keyFactory.serializeKey(keyValue, keyField); + } + + byte[] serializeKeyField(Object keyValue, StructField keyField) throws IOException { if (keyValue == null) { throw new IOException("HBase row key cannot be NULL"); } ObjectInspector keyFieldOI = keyField.getFieldObjectInspector(); - if (!keyFieldOI.getCategory().equals(ObjectInspector.Category.PRIMITIVE) && - keyMapping.isCategory(ObjectInspector.Category.PRIMITIVE)) { + if (keyMapping.convertToJson(keyFieldOI)) { // we always serialize the String type using the escaped algorithm for LazyString return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI), PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false); @@ -125,6 +127,21 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep return serialize(keyValue, keyFieldOI, 1, writeBinary); } + byte[][] serializeField(int index, Object o, ObjectInspector oi, byte[][] holder) + throws IOException { + ColumnMapping colMap = columnMappings[index]; + boolean writeBinary = colMap.binaryStorage.get(0); + if (colMap.convertToJson(oi)) { + o = SerDeUtils.getJSONString(o, oi); + oi = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + writeBinary = false; + } + holder[0] = colMap.familyNameBytes; + holder[1] = colMap.qualifierNameBytes; + holder[2] = serialize(o, oi, 1, writeBinary); + return holder; + } + private void serializeField( Object value, StructField field, ColumnMapping colMap, Put put) throws IOException { if (value == null) { @@ -168,8 +185,7 @@ private void serializeField( // the field is declared as a primitive in initialization, serialize // the data to JSON string. Otherwise serialize the data in the // delimited way. - if (!foi.getCategory().equals(ObjectInspector.Category.PRIMITIVE) - && colMap.isCategory(ObjectInspector.Category.PRIMITIVE)) { + if (colMap.convertToJson(foi)) { // we always serialize the String type using the escaped algorithm for LazyString bytes = serialize(SerDeUtils.getJSONString(value, foi), PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 6c1ce5c..5afae78 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -205,6 +205,10 @@ public HBaseSerDeParameters getHBaseSerdeParam() { return serdeParams; } + public HBaseRowSerializer getSerializer() { + return serializer; + } + /** * Deserialize a row from the HBase Result writable to a LazyObject * @param result the HBase Result Writable containing the row diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java new file mode 100644 index 0000000..360fbac --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileExporter.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hive.conf.HiveConf; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputCommitter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.util.Progressable; + +public class HiveHFileExporter extends HFileOutputFormat implements + HiveOutputFormat, HiveOutputCommitter { + + static final Log LOG = LogFactory.getLog(HiveHFileExporter.class.getName()); + + private org.apache.hadoop.mapreduce.RecordWriter + getFileWriter(org.apache.hadoop.mapreduce.TaskAttemptContext tac) + throws IOException { + try { + return super.getRecordWriter(tac); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + + @Override + public RecordWriter getHiveRecordWriter( + final JobConf jc, + final Path finalOutPath, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + final Progressable progressable) throws IOException { + + final Job job = new Job(jc); + + setCompressOutput(job, isCompressed); + setOutputPath(job, finalOutPath); + + // Create the HFile writer + final org.apache.hadoop.mapreduce.TaskAttemptContext tac = + ShimLoader.getHadoopShims().newTaskAttemptContext( + job.getConfiguration(), progressable); + + final JobContext jctx = ShimLoader.getHadoopShims().newJobContext(job); + jctx.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + final org.apache.hadoop.mapreduce.RecordWriter< + ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac); + + return new RecordWriter() { + + @Override + public void close(boolean abort) throws IOException { + try { + fileWriter.close(null); + if (!abort) { + FileOutputCommitter committer = new FileOutputCommitter(finalOutPath, tac); + committer.commitTask(tac); + committer.commitJob(jctx); + } + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + + @Override + public void write(Writable w) throws IOException { + // Decompose the incoming text row into fields. + HBaseExportSerDe.HBaseUnionWritable handover = (HBaseExportSerDe.HBaseUnionWritable) w; + KeyValue kv = handover.toKeyValue(); + try { + fileWriter.write(null, kv); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + }; + } + + private transient boolean prev; + + public void commit(HiveConf conf, Path path) throws HiveException { + prev = HiveConf.getBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES); + Set created = new HashSet(); + try { + FileSystem fs = path.getFileSystem(conf); + for (FileStatus status : fs.listStatus(path)) { + if (!status.isDir()) { + continue; + } + for (FileStatus family : fs.listStatus(status.getPath())) { + Path source = family.getPath(); + Path target = new Path(path, source.getName()); + if (created.add(target)) { + fs.mkdirs(target); + } + for (FileStatus region : fs.listStatus(source)) { + Path regionPath = region.getPath(); + fs.rename(regionPath, new Path(target, regionPath.getName())); + } + } + fs.delete(status.getPath(), true); + } + HiveConf.setBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true); + } catch (IOException e) { + throw new HiveException(e); + } + } + + @Override + public void completed(HiveConf conf) { + HiveConf.setBoolVar(conf, HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, prev); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf jc) throws IOException { + Job job = new Job(jc); + checkOutputSpecs(ShimLoader.getHadoopShims().newJobContext(job)); + } + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter( + FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + throw new NotImplementedException("This will not be invoked"); + } +} diff --git hbase-handler/src/test/queries/positive/hbase_bulk2.m hbase-handler/src/test/queries/positive/hbase_bulk2.m new file mode 100644 index 0000000..4c52831 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_bulk2.m @@ -0,0 +1,33 @@ +DROP table hbase_export; + +CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING) +ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key") +STORED AS + INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter' +LOCATION '/tmp/export'; + +set mapred.reduce.tasks=4; +set hive.optimize.sampling.orderby=true; + +INSERT OVERWRITE TABLE hbase_export +SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values; + +dfs -count /tmp/export; +dfs -lsr /tmp/export; + +-- QTestUtil masks something like this (validated by HFile.Writer itself) +-- 3 8 40392 hdfs://localhost.localdomain:50049/tmp/export +-- drwxr-xr-x - navis supergroup 0 2013-06-24 11:08 /tmp/export/cf1 +-- -rw-r--r-- 1 navis supergroup 4274 2013-06-24 11:08 /tmp/export/cf1/62839fa5a60d40aab210b4159fc94428 +-- -rw-r--r-- 1 navis supergroup 6324 2013-06-24 11:08 /tmp/export/cf1/961670eae78649caa777d1adce381802 +-- -rw-r--r-- 1 navis supergroup 4977 2013-06-24 11:08 /tmp/export/cf1/d618fd415c4a42d8b980e55ad0accdf1 +-- -rw-r--r-- 1 navis supergroup 4113 2013-06-24 11:08 /tmp/export/cf1/f90bbc7d0b3741689c0385692516c94d +-- drwxr-xr-x - navis supergroup 0 2013-06-24 11:08 /tmp/export/cf2 +-- -rw-r--r-- 1 navis supergroup 4323 2013-06-24 11:08 /tmp/export/cf2/0b85ebc7f5974d81a2b321ac2a87569f +-- -rw-r--r-- 1 navis supergroup 6732 2013-06-24 11:08 /tmp/export/cf2/38765a24c54c49e38a6033f4f2c951c1 +-- -rw-r--r-- 1 navis supergroup 5200 2013-06-24 11:08 /tmp/export/cf2/57a12a447137484b828165842c3da0f8 +-- -rw-r--r-- 1 navis supergroup 4449 2013-06-24 11:08 /tmp/export/cf2/c5471ba48d3d400e82f022c893e62c80 + +desc formatted hbase_export; diff --git hbase-handler/src/test/results/positive/hbase_bulk2.m.out hbase-handler/src/test/results/positive/hbase_bulk2.m.out new file mode 100644 index 0000000..f97f7fe --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_bulk2.m.out @@ -0,0 +1,82 @@ +PREHOOK: query: DROP table hbase_export +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP table hbase_export +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING) +ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key") +STORED AS + INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter' +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +POSTHOOK: query: CREATE EXTERNAL TABLE hbase_export(rowkey STRING, col1 STRING, col2 STRING) +ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseExportSerDe' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf2:value,:key,cf1:key") +STORED AS + INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileExporter' +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_export +PREHOOK: query: INSERT OVERWRITE TABLE hbase_export +SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_export +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_export +SELECT * from (SELECT hfile_kv(key,key,value,"cf2:value,:key,cf1:key") as (key,values) FROM src) A ORDER BY key,values +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_export +POSTHOOK: Lineage: hbase_export.key SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: hbase_export.values SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ] +#### A masked pattern was here #### +PREHOOK: query: -- QTestUtil masks something like this (validated by HFile.Writer itself) +#### A masked pattern was here #### + +desc formatted hbase_export +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@hbase_export +POSTHOOK: query: -- QTestUtil masks something like this (validated by HFile.Writer itself) +#### A masked pattern was here #### + +desc formatted hbase_export +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@hbase_export +# col_name data_type comment + +key string from deserializer +values uniontype from deserializer + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + EXTERNAL TRUE + numFiles 0 + numRows 0 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.hbase.HBaseExportSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.hbase.HiveHFileExporter +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + hbase.columns.mapping cf2:value,:key,cf1:key + serialization.format 1 diff --git itests/qtest/pom.xml itests/qtest/pom.xml index 249956f..0d646c2 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -552,7 +552,7 @@ outputDirectory="${project.build.directory}/generated-test-sources/java/org/apache/hadoop/hive/cli/" templatePath="${basedir}/${hive.path.to.root}/hbase-handler/src/test/templates/" template="TestHBaseCliDriver.vm" queryDirectory="${basedir}/${hive.path.to.root}/hbase-handler/src/test/queries/positive/" - queryFile="hbase_bulk.m" + queryFile="hbase_bulk.m,hbase_bulk2.m" runDisabled="${run_disabled}" clusterMode="miniMR" resultsDirectory="${basedir}/${hive.path.to.root}/hbase-handler/src/test/results/positive/" className="TestHBaseMinimrCliDriver" diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index a80feb9..69d5233 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -450,6 +450,8 @@ registerTableFunction(NOOP_STREAMING_MAP_TABLE_FUNCTION, NoopWithMapStreamingResolver.class); registerTableFunction(WINDOWING_TABLE_FUNCTION, WindowingTableFunctionResolver.class); registerTableFunction("matchpath", MatchPathResolver.class); + + registerGenericUDTF("hfile_kv", HFileKeyValue.class); } public static void registerTemporaryUDF(String functionName, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java new file mode 100644 index 0000000..c563302 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputCommitter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface HiveOutputCommitter { + + void commit(HiveConf conf, Path path) throws HiveException; + + void completed(HiveConf conf); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fea1e47..cb3510e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; @@ -94,6 +93,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; +import org.apache.hadoop.hive.ql.io.HiveOutputCommitter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; @@ -1253,6 +1254,11 @@ public void loadPartition(Path loadPath, String tableName, newPartPath = oldPartPath; } + HiveOutputCommitter committer = getCommitter(tbl.getOutputFormatClass()); + if (committer != null) { + committer.commit(conf, loadPath); + } + if (replace) { Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); @@ -1279,6 +1285,9 @@ public void loadPartition(Path loadPath, String tableName, newCreatedTpart = newTPart.getTPartition(); } } + if (committer != null) { + committer.completed(conf); + } } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -1476,6 +1485,11 @@ private void constructOneLBLocationMap(FileStatus fSta, public void loadTable(Path loadPath, String tableName, boolean replace, boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException { Table tbl = getTable(tableName); + HiveOutputCommitter committer = getCommitter(tbl.getOutputFormatClass()); + if (committer != null) { + committer.commit(conf, loadPath); + } + if (replace) { tbl.replaceFiles(loadPath, isSrcLocal); } else { @@ -1503,6 +1517,21 @@ public void loadTable(Path loadPath, String tableName, boolean replace, throw new HiveException(e); } } + if (committer != null) { + committer.completed(conf); + } + } + + private HiveOutputCommitter getCommitter(Class output) + throws HiveException { + if (HiveOutputCommitter.class.isAssignableFrom(output)) { + try { + return (HiveOutputCommitter) output.newInstance(); + } catch (Exception e) { + throw new HiveException(e); + } + } + return null; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java new file mode 100644 index 0000000..a9af34a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/HFileKeyValue.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; + +@Description(name = "hfile_kv", + value = "_FUNC_(a,b,c..,'hbase column mapping') - makes key value pair for each values") + public class HFileKeyValue extends GenericUDTF { + + private int keyIndex; + private byte[] tags; + private final StandardUnion union = new StandardUnion(); + private final Object[] forward = new Object[] { null, union }; + + @Override + public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length < 3) { + throw new UDFArgumentException("hfile_union should have at least three arguments"); + } + ObjectInspector mapOI = argOIs[argOIs.length - 1]; + if (!(mapOI instanceof ConstantObjectInspector && mapOI instanceof StringObjectInspector)) { + throw new UDFArgumentException("The last argument should be a constant string"); + } + String mapping = String.valueOf(((ConstantObjectInspector) mapOI).getWritableConstantValue()); + String[] splits = mapping.split(","); + if (splits.length != argOIs.length - 1) { + throw new UDFArgumentException("The number of columns is not matching " + + "with column mapping information"); + } + int numValues = splits.length - 1; + + ColumnIndex[] valCols = new ColumnIndex[numValues]; + ObjectInspector[] valOIs = new ObjectInspector[numValues]; + + byte tag = 0; + for (int i = 0; i < argOIs.length - 1; i++) { + int sharp = splits[i].indexOf('#'); + if (sharp > 0) { + splits[i] = splits[i].substring(0, sharp); + } + if (splits[i].equals(":key")) { + keyIndex = i; + continue; + } + valOIs[tag] = argOIs[i]; + valCols[tag] = new ColumnIndex(splits[i], tag++); + } + Arrays.sort(valCols); + byte[] tags = new byte[valCols.length]; + for (int i = 0; i < tags.length; i++) { + tags[i] = valCols[i].tag; + } + this.tags = tags; + + ObjectInspector unionOI = ObjectInspectorFactory.getStandardUnionObjectInspector( + Arrays.asList(valOIs)); + return ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList("key", "values"), Arrays.asList(argOIs[keyIndex], unionOI)); + } + + @Override + public void process(Object[] args) throws HiveException { + forward[0] = args[keyIndex]; + for (int i = 0; i < tags.length; i++) { + union.setObject(args[i]); + union.setTag(tags[i]); + forward(forward); + } + } + + @Override + public void close() throws HiveException { + } + + private static class ColumnIndex implements Comparable { + + private final String columnName; + private final byte tag; + + public ColumnIndex(String columnName, byte tag) { + this.columnName = columnName; + this.tag = tag; + } + + public int compareTo(ColumnIndex o) { + return columnName.compareTo(o.columnName); + } + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java index cae4faa..5df1aad 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java @@ -104,6 +104,8 @@ public static LazyBinaryObject createLazyBinaryObject(ObjectInspector oi) { return new LazyBinaryMap((LazyBinaryMapObjectInspector) oi); case LIST: return new LazyBinaryArray((LazyBinaryListObjectInspector) oi); + case UNION: + return new LazyBinaryUnion((LazyBinaryUnionObjectInspector) oi); case STRUCT: return new LazyBinaryStruct((LazyBinaryStructObjectInspector) oi); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java index 1c8f795..b56912e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java @@ -27,12 +27,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.Decimal128; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -44,10 +41,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; @@ -55,7 +53,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; @@ -544,6 +541,31 @@ public static void serialize(RandomAccessOutput byteStream, Object obj, } return; } + case UNION: { + int byteSizeStart = 0; + int unionStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + byteSizeStart = byteStream.getLength(); + byteStream.reserve(4); + unionStart = byteStream.getLength(); + } + UnionObjectInspector uoi = (UnionObjectInspector) objInspector; + byte tag = uoi.getTag(obj); + ObjectInspector foi = uoi.getObjectInspectors().get(tag); + + byteStream.write(tag); + serialize(byteStream, uoi.getField(obj), foi, false, warnedOnceNullMapKey); + + if (!skipLengthPrefix) { + // 5/ update the byte size of the map + int unionEnd = byteStream.getLength(); + int unionSize = unionEnd - unionStart; + writeSizeAtOffset(byteStream, byteSizeStart, unionSize); + } + return; + } case STRUCT: { int byteSizeStart = 0; int structStart = 0; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java new file mode 100644 index 0000000..704a802 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnion.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.lazybinary; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +public class LazyBinaryUnion extends LazyBinaryNonPrimitive { + + private byte tag = -1; + private boolean initialized; + private LazyBinaryObject field; + + public LazyBinaryUnion(LazyBinaryUnionObjectInspector oi) { + super(oi); + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + super.init(bytes, start, length); + initialized = false; + tag = -1; + } + + public byte getTag() { + if (tag == -1) { + parse(); + } + return tag; + } + + public Object getField() { + if (tag == -1) { + parse(); + } + if (!initialized && field != null) { + field.init(bytes, start + 1, length - 1); + } + initialized = true; + return field == null ? null : field.getObject(); + } + + private void parse() { + byte[] data = bytes.getData(); + ObjectInspector fieldOI = oi.getObjectInspectors().get(tag = data[start]); + field = LazyBinaryFactory.createLazyBinaryObject(fieldOI); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java new file mode 100644 index 0000000..d032823 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUnionObjectInspector.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.lazybinary; + +import java.util.List; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; + +public class LazyBinaryUnionObjectInspector extends StandardUnionObjectInspector { + public LazyBinaryUnionObjectInspector(List ois) { + super(ois); + } + + @Override + public byte getTag(Object o) { + return o == null ? -1 : ((LazyBinaryUnion) o).getTag(); + } + + @Override + public Object getField(Object o) { + return o == null ? null : ((LazyBinaryUnion) o).getField(); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java index f7cfb36..fd6320a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.WritableUtils; /** @@ -234,6 +235,7 @@ public static void checkObjectByteInfo(ObjectInspector objectInspector, case LIST: case MAP: case STRUCT: + case UNION: recordInfo.elementOffset = 4; recordInfo.elementSize = LazyBinaryUtils.byteArrayToInt(bytes, offset); break; @@ -465,6 +467,19 @@ public static ObjectInspector getLazyBinaryObjectInspectorFromTypeInfo( valueObjectInspector); break; } + case UNION: { + UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; + List types = unionTypeInfo.getAllUnionObjectTypeInfos(); + List fieldObjectInspectors = new ArrayList(types.size()); + for (int i = 0; i < types.size(); i++) { + fieldObjectInspectors + .add(getLazyBinaryObjectInspectorFromTypeInfo(types + .get(i))); + } + result = LazyBinaryObjectInspectorFactory. + getLazyBinaryUnionObjectInspector(fieldObjectInspectors); + break; + } case STRUCT: { StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; List fieldNames = structTypeInfo.getAllStructFieldNames(); diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java index b3ec24d..802ef2c 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryObjectInspectorFactory.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; /** @@ -101,6 +102,22 @@ public static LazyBinaryMapObjectInspector getLazyBinaryMapObjectInspector( return result; } + static ConcurrentHashMap, LazyBinaryUnionObjectInspector> + cachedLazyBinaryUnionObjectInspector = + new ConcurrentHashMap, LazyBinaryUnionObjectInspector>(); + + public static LazyBinaryUnionObjectInspector getLazyBinaryUnionObjectInspector( + List elementsOI) { + ArrayList signature = new ArrayList(); + signature.add(elementsOI); + LazyBinaryUnionObjectInspector result = cachedLazyBinaryUnionObjectInspector.get(signature); + if (result == null) { + result = new LazyBinaryUnionObjectInspector(elementsOI); + cachedLazyBinaryUnionObjectInspector.put(signature, result); + } + return result; + } + private LazyBinaryObjectInspectorFactory() { // prevent instantiation }