From 85020ea8e7e5fb06f6301f9c5bef806d0f87688b Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 15 Jul 2014 16:57:29 -0700 Subject: [PATCH] HIVE-6584 Add HiveHBaseTableSnapshotInputFormat --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 + conf/hive-default.xml.template | 10 + .../hadoop/hive/hbase/HBaseStorageHandler.java | 44 +++ .../hive/hbase/HiveHBaseInputFormatUtil.java | 156 +++++++++ .../hive/hbase/HiveHBaseTableInputFormat.java | 115 +------ .../hbase/HiveHBaseTableSnapshotInputFormat.java | 97 ++++++ .../test/queries/positive/hbase_handler_snapshot.q | 15 + .../test/results/positive/external_table_ppd.q.out | 4 +- .../positive/hbase_binary_map_queries.q.out | 20 +- .../positive/hbase_binary_map_queries_prefix.q.out | 20 +- .../positive/hbase_binary_storage_queries.q.out | 8 +- .../results/positive/hbase_handler_snapshot.q.out | 373 +++++++++++++++++++++ .../hbase_single_sourced_multi_insert.q.out | 18 +- .../positive/hbase_stats_empty_partition.q.out | 4 +- .../src/test/templates/TestHBaseCliDriver.vm | 1 - .../apache/hadoop/hive/hbase/HBaseQTestUtil.java | 88 ++++- .../apache/hadoop/hive/hbase/HBaseTestSetup.java | 82 +++-- .../java/org/apache/hadoop/hive/ql/QTestUtil.java | 12 +- pom.xml | 4 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 12 +- 20 files changed, 900 insertions(+), 186 deletions(-) create mode 100644 hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java create mode 100644 hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java create mode 100644 hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q create mode 100644 hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index eb3160e..85e165e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1241,6 +1241,9 @@ "Disabling this improves HBase write performance at the risk of lost writes in case of a crash."), HIVE_HBASE_GENERATE_HFILES("hive.hbase.generatehfiles", false, "True when HBaseStorageHandler should generate hfiles instead of operate against the online table."), + HIVE_HBASE_SNAPSHOT_NAME("hive.hbase.snapshot.name", null, "The HBase table snapshot name to use."), + HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", null, "The directory in which to " + + "restore the HBase table snapshot."), // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"), diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index ba5b8a9..cc2d4bb 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -2200,6 +2200,16 @@ True when HBaseStorageHandler should generate hfiles instead of operate against the online table. + hive.hbase.snapshot.name + + The HBase table snapshot name to use. + + + hive.hbase.snapshot.restoredir + + The directory in which to restore the HBase table snapshot. + + hive.archive.enabled false Whether archiving operations are permitted diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index dbf5e51..8d90d20 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -29,7 +29,10 @@ import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; @@ -69,6 +73,8 @@ public class HBaseStorageHandler extends DefaultStorageHandler implements HiveMetaHook, HiveStoragePredicateHandler { + private static final Log LOG = LogFactory.getLog(HBaseStorageHandler.class); + final static public String DEFAULT_PREFIX = "default."; //Check if the configure job properties is called from input @@ -258,6 +264,11 @@ public void setConf(Configuration conf) { @Override public Class getInputFormatClass() { + if (getInputSnapshotName(jobConf) != null) { + LOG.debug("Using TableSnapshotInputFormat"); + return HiveHBaseTableSnapshotInputFormat.class; + } + LOG.debug("Using HiveHBaseTableInputFormat"); return HiveHBaseTableInputFormat.class; } @@ -279,6 +290,17 @@ public HiveMetaHook getMetaHook() { return this; } + /** + * Return the snapshot name to use as job input, null otherwise. + */ + static String getInputSnapshotName(Configuration conf) { + return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME); + } + + static String getInputSnapshotRestoreDir(Configuration conf) { + return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_RESTORE_DIR); + } + @Override public void configureInputJobProperties( TableDesc tableDesc, @@ -342,6 +364,28 @@ public void configureTableJobProperties( // do this for reconciling HBaseStorageHandler for use in HCatalog // check to see if this an input job or an outputjob if (this.configureInputJobProps) { + String snapshotName = getInputSnapshotName(jobConf); + if (snapshotName != null) { + try { + // TODO: automatically provide a reasonable restore path when none provided. + String restoreDir = getInputSnapshotRestoreDir(jobConf); + if (restoreDir == null) { + throw new IllegalArgumentException( + "Cannot process HBase snapshot without specifying hive.hbase.snapshot.restoredir"); + } + LOG.debug("Restoring snapshot '" + snapshotName + "' into path under '" + restoreDir + "'"); + TableSnapshotInputFormatImpl.setInput(hbaseConf, snapshotName, new Path(restoreDir)); + + // copy over additional added configs. + jobProperties.put("hbase.TableSnapshotInputFormat.snapshot.name", hbaseConf.get("hbase.TableSnapshotInputFormat.snapshot.name")); + jobConf.set("hbase.TableSnapshotInputFormat.snapshot.name", hbaseConf.get("hbase.TableSnapshotInputFormat.snapshot.name")); + jobProperties.put("hbase.TableSnapshotInputFormat.table.dir", hbaseConf.get("hbase.TableSnapshotInputFormat.table.dir")); + jobConf.set("hbase.TableSnapshotInputFormat.table.dir", hbaseConf.get("hbase.TableSnapshotInputFormat.table.dir")); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + for (String k : jobProperties.keySet()) { jobConf.set(k, jobProperties.get(k)); } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java new file mode 100644 index 0000000..5aa1d79 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. + */ +class HiveHBaseInputFormatUtil { + + /** + * Parse {@code jobConf} to create the target {@link HTable} instance. + */ + public static HTable getTable(JobConf jobConf) throws IOException { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)); + } + + /** + * Parse {@code jobConf} to create a {@link Scan} instance. + */ + public static Scan getScan(JobConf jobConf) throws IOException { + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + ColumnMappings columnMappings; + + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } + + if (columnMappings.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + + boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); + Scan scan = new Scan(); + boolean empty = true; + + // The list of families that have been added to the scan + List addedFamilies = new ArrayList(); + + if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i : readColIDs) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + addedFamilies.add(colMap.familyName); + } else { + if(!addedFamilies.contains(colMap.familyName)){ + // add only if the corresponding family has not already been added + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + } + + empty = false; + } + } + + // The HBase table's row key maps to a Hive table column. In the corner case when only the + // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ + // column qualifier will have been added to the scan. We arbitrarily add at least one column + // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive + // tables column projection. + if (empty) { + for (ColumnMapping colMap: columnMappings) { + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + } else { + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + + if (!readAllColumns) { + break; + } + } + } + + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + return scan; + } + + public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ + + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat); + + switch (mapInfo.length) { + case 1: + return tblLevelDefault; + + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; + } + + default: + throw new IOException("Malformed string: " + spec); + } + } +} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 1032cc9..a252906 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -89,89 +88,10 @@ HBaseSplit hbaseSplit = (HBaseSplit) split; TableSplit tableSplit = hbaseSplit.getSplit(); - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - ColumnMappings columnMappings; - - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } - - if (columnMappings.size() < readColIDs.size()) { - throw new IOException("Cannot read more columns than the given table contains."); - } - - boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - Scan scan = new Scan(); - boolean empty = true; - - // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); - - if (!readAllColumns) { - ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); - for (int i : readColIDs) { - ColumnMapping colMap = columnsMapping[i]; - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add only if the corresponding family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - } - - empty = false; - } - } - - // The HBase table's row key maps to a Hive table column. In the corner case when only the - // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ - // column qualifier will have been added to the scan. We arbitrarily add at least one column - // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive - // tables column projection. - if (empty) { - for (ColumnMapping colMap: columnMappings) { - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - } else { - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - if (!readAllColumns) { - break; - } - } - } - - String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); - if (scanCache != null) { - scan.setCaching(Integer.valueOf(scanCache)); - } - String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); - if (scanCacheBlocks != null) { - scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); - } - String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); - if (scanBatch != null) { - scan.setBatch(Integer.valueOf(scanBatch)); - } + setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); + setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); - setScan(scan); Job job = new Job(jobConf); TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( job.getConfiguration(), reporter); @@ -443,12 +363,12 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); if (hbaseColumnsMapping == null) { - throw new IOException("hbase.columns.mapping required for HBase Table."); + throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); } ColumnMappings columnMappings = null; try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } @@ -463,10 +383,9 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). Scan scan = createFilterScan(jobConf, iKey, - getStorageFormatOfKey(keyMapping.mappingSpec, + HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); - // The list of families that have been added to the scan List addedFamilies = new ArrayList(); @@ -503,28 +422,4 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( return results; } - - private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ - - String[] mapInfo = spec.split("#"); - boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; - - switch (mapInfo.length) { - case 1: - return tblLevelDefault; - - case 2: - String storageType = mapInfo[1]; - if(storageType.equals("-")) { - return tblLevelDefault; - } else if ("string".startsWith(storageType)){ - return false; - } else if ("binary".startsWith(storageType)){ - return true; - } - - default: - throw new IOException("Malformed string: " + spec); - } - } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java new file mode 100644 index 0000000..3232c00 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableInputFormat; +import org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; + +public class HiveHBaseTableSnapshotInputFormat + implements InputFormat { + + TableSnapshotInputFormat delegate = new TableSnapshotInputFormat(); + + private static void setColumns(JobConf job) throws IOException { + // hbase mapred API doesn't support scan at the moment. + Scan scan = HiveHBaseInputFormatUtil.getScan(job); + byte[][] families = scan.getFamilies(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < families.length; i++) { + if (i > 0) sb.append(" "); + sb.append(Bytes.toString(families[i])); + } + job.set(TableInputFormat.COLUMN_LIST, sb.toString()); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + setColumns(job); + return delegate.getSplits(job, numSplits); + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + setColumns(job); + final RecordReader rr = + delegate.getRecordReader(split, job, reporter); + + return new RecordReader() { + @Override + public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException { + return rr.next(key, value.getResult()); + } + + @Override + public ImmutableBytesWritable createKey() { + return rr.createKey(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(rr.createValue()); + } + + @Override + public long getPos() throws IOException { + return rr.getPos(); + } + + @Override + public void close() throws IOException { + rr.close(); + } + + @Override + public float getProgress() throws IOException { + return rr.getProgress(); + } + }; + } +} diff --git a/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q b/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q new file mode 100644 index 0000000..b972057 --- /dev/null +++ b/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q @@ -0,0 +1,15 @@ +-- -*- mode:sql -*- +-- use table and snapshot created in o.a.h.h.h.HBaseQTestUtil +-- explain to ensure HiveHBaseTableInputFormat +-- enable snapshot for the following query +-- explain to ensure HiveHBaseTableSnapshotInputFormat +-- run the query + +explain select * from hbase_src; + +set hive.hbase.snapshot.name=hbase_src_snapshot; +set hive.hbase.snapshot.restoredir=/tmp; + +explain select * from hbase_src; + +select * from hbase_src; diff --git a/hbase-handler/src/test/results/positive/external_table_ppd.q.out b/hbase-handler/src/test/results/positive/external_table_ppd.q.out index 6f1adf4..3519279 100644 --- a/hbase-handler/src/test/results/positive/external_table_ppd.q.out +++ b/hbase-handler/src/test/results/positive/external_table_ppd.q.out @@ -63,8 +63,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] diff --git a/hbase-handler/src/test/results/positive/hbase_binary_map_queries.q.out b/hbase-handler/src/test/results/positive/hbase_binary_map_queries.q.out index 5981b8d..fd6a51e 100644 --- a/hbase-handler/src/test/results/positive/hbase_binary_map_queries.q.out +++ b/hbase-handler/src/test/results/positive/hbase_binary_map_queries.q.out @@ -1,7 +1,11 @@ PREHOOK: query: DROP TABLE hbase_src PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_src +PREHOOK: Output: default@hbase_src POSTHOOK: query: DROP TABLE hbase_src POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_src +POSTHOOK: Output: default@hbase_src PREHOOK: query: CREATE TABLE hbase_src(key STRING, tinyint_col TINYINT, smallint_col SMALLINT, @@ -37,14 +41,14 @@ POSTHOOK: query: INSERT OVERWRITE TABLE hbase_src POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@hbase_src -POSTHOOK: Lineage: hbase_src.bigint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.double_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.float_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.int_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.smallint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.string_col SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.tinyint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: hbase_src.bigint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.double_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.float_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.int_col SIMPLE [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.key SIMPLE [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.smallint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.string_col SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: hbase_src.tinyint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] PREHOOK: query: DROP TABLE t_hbase_maps PREHOOK: type: DROPTABLE POSTHOOK: query: DROP TABLE t_hbase_maps diff --git a/hbase-handler/src/test/results/positive/hbase_binary_map_queries_prefix.q.out b/hbase-handler/src/test/results/positive/hbase_binary_map_queries_prefix.q.out index bd59719..73a1ad0 100644 --- a/hbase-handler/src/test/results/positive/hbase_binary_map_queries_prefix.q.out +++ b/hbase-handler/src/test/results/positive/hbase_binary_map_queries_prefix.q.out @@ -1,7 +1,11 @@ PREHOOK: query: DROP TABLE hbase_src PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_src +PREHOOK: Output: default@hbase_src POSTHOOK: query: DROP TABLE hbase_src POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_src +POSTHOOK: Output: default@hbase_src PREHOOK: query: CREATE TABLE hbase_src(key STRING, tinyint_col TINYINT, smallint_col SMALLINT, @@ -37,14 +41,14 @@ POSTHOOK: query: INSERT OVERWRITE TABLE hbase_src POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@hbase_src -POSTHOOK: Lineage: hbase_src.bigint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.double_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.float_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.int_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.smallint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.string_col SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] -POSTHOOK: Lineage: hbase_src.tinyint_col EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: hbase_src.bigint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.double_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.float_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.int_col SIMPLE [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.key SIMPLE [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.smallint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: hbase_src.string_col SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: hbase_src.tinyint_col EXPRESSION [(src)src.FieldSchema(name:key, type:int, comment:null), ] PREHOOK: query: DROP TABLE t_hbase_maps PREHOOK: type: DROPTABLE POSTHOOK: query: DROP TABLE t_hbase_maps diff --git a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out index b92db11..edaa88d 100644 --- a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out +++ b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out @@ -63,8 +63,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] @@ -238,8 +238,8 @@ Table Parameters: # Storage Information SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe -InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat +InputFormat: null +OutputFormat: null Compressed: No Num Buckets: -1 Bucket Columns: [] diff --git a/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out b/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out new file mode 100644 index 0000000..38d536b --- /dev/null +++ b/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out @@ -0,0 +1,373 @@ +PREHOOK: query: -- -*- mode:sql -*- +-- use table and snapshot created in o.a.h.h.h.HBaseQTestUtil +-- explain to ensure HiveHBaseTableInputFormat +-- enable snapshot for the following query +-- explain to ensure HiveHBaseTableSnapshotInputFormat +-- run the query + +explain select * from hbase_src +PREHOOK: type: QUERY +POSTHOOK: query: -- -*- mode:sql -*- +-- use table and snapshot created in o.a.h.h.h.HBaseQTestUtil +-- explain to ensure HiveHBaseTableInputFormat +-- enable snapshot for the following query +-- explain to ensure HiveHBaseTableSnapshotInputFormat +-- run the query + +explain select * from hbase_src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_src + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: explain select * from hbase_src +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from hbase_src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_src + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_src +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_src +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_src +#### A masked pattern was here #### +0 val_0 +10 val_10 +100 val_100 +103 val_103 +104 val_104 +105 val_105 +11 val_11 +111 val_111 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +119 val_119 +12 val_12 +120 val_120 +125 val_125 +126 val_126 +128 val_128 +129 val_129 +131 val_131 +133 val_133 +134 val_134 +136 val_136 +137 val_137 +138 val_138 +143 val_143 +145 val_145 +146 val_146 +149 val_149 +15 val_15 +150 val_150 +152 val_152 +153 val_153 +155 val_155 +156 val_156 +157 val_157 +158 val_158 +160 val_160 +162 val_162 +163 val_163 +164 val_164 +165 val_165 +166 val_166 +167 val_167 +168 val_168 +169 val_169 +17 val_17 +170 val_170 +172 val_172 +174 val_174 +175 val_175 +176 val_176 +177 val_177 +178 val_178 +179 val_179 +18 val_18 +180 val_180 +181 val_181 +183 val_183 +186 val_186 +187 val_187 +189 val_189 +19 val_19 +190 val_190 +191 val_191 +192 val_192 +193 val_193 +194 val_194 +195 val_195 +196 val_196 +197 val_197 +199 val_199 +2 val_2 +20 val_20 +200 val_200 +201 val_201 +202 val_202 +203 val_203 +205 val_205 +207 val_207 +208 val_208 +209 val_209 +213 val_213 +214 val_214 +216 val_216 +217 val_217 +218 val_218 +219 val_219 +221 val_221 +222 val_222 +223 val_223 +224 val_224 +226 val_226 +228 val_228 +229 val_229 +230 val_230 +233 val_233 +235 val_235 +237 val_237 +238 val_238 +239 val_239 +24 val_24 +241 val_241 +242 val_242 +244 val_244 +247 val_247 +248 val_248 +249 val_249 +252 val_252 +255 val_255 +256 val_256 +257 val_257 +258 val_258 +26 val_26 +260 val_260 +262 val_262 +263 val_263 +265 val_265 +266 val_266 +27 val_27 +272 val_272 +273 val_273 +274 val_274 +275 val_275 +277 val_277 +278 val_278 +28 val_28 +280 val_280 +281 val_281 +282 val_282 +283 val_283 +284 val_284 +285 val_285 +286 val_286 +287 val_287 +288 val_288 +289 val_289 +291 val_291 +292 val_292 +296 val_296 +298 val_298 +30 val_30 +302 val_302 +305 val_305 +306 val_306 +307 val_307 +308 val_308 +309 val_309 +310 val_310 +311 val_311 +315 val_315 +316 val_316 +317 val_317 +318 val_318 +321 val_321 +322 val_322 +323 val_323 +325 val_325 +327 val_327 +33 val_33 +331 val_331 +332 val_332 +333 val_333 +335 val_335 +336 val_336 +338 val_338 +339 val_339 +34 val_34 +341 val_341 +342 val_342 +344 val_344 +345 val_345 +348 val_348 +35 val_35 +351 val_351 +353 val_353 +356 val_356 +360 val_360 +362 val_362 +364 val_364 +365 val_365 +366 val_366 +367 val_367 +368 val_368 +369 val_369 +37 val_37 +373 val_373 +374 val_374 +375 val_375 +377 val_377 +378 val_378 +379 val_379 +382 val_382 +384 val_384 +386 val_386 +389 val_389 +392 val_392 +393 val_393 +394 val_394 +395 val_395 +396 val_396 +397 val_397 +399 val_399 +4 val_4 +400 val_400 +401 val_401 +402 val_402 +403 val_403 +404 val_404 +406 val_406 +407 val_407 +409 val_409 +41 val_41 +411 val_411 +413 val_413 +414 val_414 +417 val_417 +418 val_418 +419 val_419 +42 val_42 +421 val_421 +424 val_424 +427 val_427 +429 val_429 +43 val_43 +430 val_430 +431 val_431 +432 val_432 +435 val_435 +436 val_436 +437 val_437 +438 val_438 +439 val_439 +44 val_44 +443 val_443 +444 val_444 +446 val_446 +448 val_448 +449 val_449 +452 val_452 +453 val_453 +454 val_454 +455 val_455 +457 val_457 +458 val_458 +459 val_459 +460 val_460 +462 val_462 +463 val_463 +466 val_466 +467 val_467 +468 val_468 +469 val_469 +47 val_47 +470 val_470 +472 val_472 +475 val_475 +477 val_477 +478 val_478 +479 val_479 +480 val_480 +481 val_481 +482 val_482 +483 val_483 +484 val_484 +485 val_485 +487 val_487 +489 val_489 +490 val_490 +491 val_491 +492 val_492 +493 val_493 +494 val_494 +495 val_495 +496 val_496 +497 val_497 +498 val_498 +5 val_5 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +69 val_69 +70 val_70 +72 val_72 +74 val_74 +76 val_76 +77 val_77 +78 val_78 +8 val_8 +80 val_80 +82 val_82 +83 val_83 +84 val_84 +85 val_85 +86 val_86 +87 val_87 +9 val_9 +90 val_90 +92 val_92 +95 val_95 +96 val_96 +97 val_97 +98 val_98 diff --git a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out index 744f5fc..89a89f9 100644 --- a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out +++ b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out @@ -48,17 +48,17 @@ STAGE PLANS: Map Operator Tree: TableScan alias: a - Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 55 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((key > 0) and (key < 50)) (type: boolean) - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: key (type: string), '' (type: string) + expressions: key (type: int), '' (type: string) outputColumnNames: _col0, _col1 - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -66,14 +66,14 @@ STAGE PLANS: name: default.src_x1 Filter Operator predicate: ((key > 50) and (key < 100)) (type: boolean) - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: value (type: string), '' (type: string) outputColumnNames: _col0, _col1 - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 3 Data size: 601 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 634 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat output format: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat @@ -150,7 +150,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@src_x1 POSTHOOK: Output: default@src_x2 -POSTHOOK: Lineage: src_x1.key SIMPLE [(src)a.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_x1.key SIMPLE [(src)a.FieldSchema(name:key, type:int, comment:null), ] POSTHOOK: Lineage: src_x1.value SIMPLE [] PREHOOK: query: select * from src_x1 order by key PREHOOK: type: QUERY diff --git a/hbase-handler/src/test/results/positive/hbase_stats_empty_partition.q.out b/hbase-handler/src/test/results/positive/hbase_stats_empty_partition.q.out index f201772..a9cf9d2 100644 --- a/hbase-handler/src/test/results/positive/hbase_stats_empty_partition.q.out +++ b/hbase-handler/src/test/results/positive/hbase_stats_empty_partition.q.out @@ -19,8 +19,8 @@ POSTHOOK: query: insert overwrite table tmptable partition (part = '1') select * POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@tmptable@part=1 -POSTHOOK: Lineage: tmptable PARTITION(part=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: tmptable PARTITION(part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable PARTITION(part=1).key SIMPLE [(src)src.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmptable PARTITION(part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:null), ] PREHOOK: query: describe formatted tmptable partition (part = '1') PREHOOK: type: DESCTABLE PREHOOK: Input: default@tmptable diff --git a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm index 01d596a..4c05e5b 100644 --- a/hbase-handler/src/test/templates/TestHBaseCliDriver.vm +++ b/hbase-handler/src/test/templates/TestHBaseCliDriver.vm @@ -27,7 +27,6 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; import org.apache.hadoop.hive.hbase.HBaseQTestUtil; import org.apache.hadoop.hive.hbase.HBaseTestSetup; -import org.apache.hadoop.hive.ql.session.SessionState; public class $className extends TestCase { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java index 96a0de2..cb81288 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java @@ -17,19 +17,41 @@ */ package org.apache.hadoop.hive.hbase; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; + +import java.util.List; /** * HBaseQTestUtil initializes HBase-specific test fixtures. */ public class HBaseQTestUtil extends QTestUtil { + + /** Name of the HBase table, in both Hive and HBase. */ + public static String HBASE_SRC_NAME = "hbase_src"; + + /** Name of the table snapshot. */ + public static String HBASE_SRC_SNAPSHOT_NAME = "hbase_src_snapshot"; + + static { + // add our hbase table to the list managed by the parent class. + srcTables.add(HBASE_SRC_NAME); + } + + /** A handle to this harness's cluster */ + private final HConnection conn; + public HBaseQTestUtil( String outDir, String logDir, MiniClusterType miniMr, HBaseTestSetup setup) throws Exception { super(outDir, logDir, miniMr, null); setup.preTest(conf); + this.conn = setup.getConnection(); super.init(); } @@ -37,4 +59,68 @@ public HBaseQTestUtil( public void init() throws Exception { // defer } + + @Override + public void createSources() throws Exception { + // don't care about the full suite of super.createSources(), so cherry-pick tables for now + + startSessionState(); + conf.setBoolean("hive.test.init.phase", true); + + // create and load the input data into the src table + runCreateTableCmd("CREATE TABLE src(key int, value string)"); + Path fpath = new Path(testFiles, "kv1.txt"); + runLoadCmd("LOAD DATA LOCAL INPATH '" + fpath.toUri().getPath() + "' INTO TABLE src"); + + // create and load the input data into the hbase table. + runCreateTableCmd( + "CREATE TABLE " + HBASE_SRC_NAME + "(key INT, value STRING)" + + " STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'" + + " WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:val')" + + " TBLPROPERTIES ('hbase.table.name' = '" + HBASE_SRC_NAME + "')" + ); + runCmd("INSERT OVERWRITE TABLE " + HBASE_SRC_NAME + " SELECT * FROM src"); + + // create a snapshot + HBaseAdmin admin = null; + try { + admin = new HBaseAdmin(conn.getConfiguration()); + admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, HBASE_SRC_NAME); + } finally { + if (admin != null) admin.close(); + } + + conf.setBoolean("hive.test.init.phase", false); + } + + @Override + public void cleanUp() throws Exception { + // don't care about the full suite of super.cleanUp(), so cherry-pick tables for now + + if (db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "src", false) != null) { + db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "src"); + } + + HBaseAdmin admin = null; + try { + admin = new HBaseAdmin(conn.getConfiguration()); + List snapshots = + admin.listSnapshots(".*" + HBASE_SRC_SNAPSHOT_NAME + ".*"); + for (HBaseProtos.SnapshotDescription sn : snapshots) { + if (sn.getName().equals(HBASE_SRC_SNAPSHOT_NAME)) { + admin.deleteSnapshot(HBASE_SRC_SNAPSHOT_NAME); + break; + } + } + + if (admin.tableExists(HBASE_SRC_NAME)) { + if (admin.isTableEnabled(HBASE_SRC_NAME)) { + admin.disableTable(HBASE_SRC_NAME); + } + admin.deleteTable(HBASE_SRC_NAME); + } + } finally { + if (admin != null) admin.close(); + } + } } diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java index cdc0a65..300f1cf 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.hbase; -import java.io.File; import java.io.IOException; import java.net.ServerSocket; import java.util.Arrays; @@ -29,12 +28,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; @@ -50,6 +50,7 @@ private MiniHBaseCluster hbaseCluster; private int zooKeeperPort; private String hbaseRoot; + private HConnection hbaseConn; private static final int NUM_REGIONSERVERS = 1; @@ -57,6 +58,10 @@ public HBaseTestSetup(Test test) { super(test); } + public HConnection getConnection() { + return this.hbaseConn; + } + void preTest(HiveConf conf) throws Exception { setUpFixtures(conf); @@ -97,27 +102,23 @@ private void setUpFixtures(HiveConf conf) throws Exception { hbaseConf.setInt("hbase.regionserver.info.port", -1); hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); conf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); + hbaseConn = HConnectionManager.createConnection(hbaseConf); + // opening the META table ensures that cluster is running - new HTable(hbaseConf, HConstants.META_TABLE_NAME); - createHBaseTable(hbaseConf); + HTableInterface meta = null; + try { + meta = hbaseConn.getTable(TableName.META_TABLE_NAME); + } finally { + if (meta != null) meta.close(); + } + createHBaseTable(); } - private void createHBaseTable(Configuration hbaseConf) throws IOException { + private void createHBaseTable() throws IOException { final String HBASE_TABLE_NAME = "HiveExternalTable"; HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes()); HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes()); htableDesc.addFamily(hcolDesc); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf); - if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){ - // if table is already in there, don't recreate. - return; - } - hbaseAdmin.createTable(htableDesc); - HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME); - - // data - Put [] puts = new Put [] { - new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) }; boolean [] booleans = new boolean [] { true, false, true }; byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE }; @@ -128,18 +129,37 @@ private void createHBaseTable(Configuration hbaseConf) throws IOException { float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE }; double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE }; - // store data - for (int i = 0; i < puts.length; i++) { - puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); - puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] }); - puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); - puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); - puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); - puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); - puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); - puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); - - htable.put(puts[i]); + HBaseAdmin hbaseAdmin = null; + HTableInterface htable = null; + try { + hbaseAdmin = new HBaseAdmin(hbaseConn.getConfiguration()); + if (Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)) { + // if table is already in there, don't recreate. + return; + } + hbaseAdmin.createTable(htableDesc); + htable = hbaseConn.getTable(HBASE_TABLE_NAME); + + // data + Put[] puts = new Put[]{ + new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes())}; + + // store data + for (int i = 0; i < puts.length; i++) { + puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); + puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]}); + puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); + puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); + puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); + puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); + puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); + puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); + + htable.put(puts[i]); + } + } finally { + if (htable != null) htable.close(); + if (hbaseAdmin != null) hbaseAdmin.close(); } } @@ -152,6 +172,10 @@ private static int findFreePort() throws IOException { @Override protected void tearDown() throws Exception { + if (hbaseConn != null) { + hbaseConn.close(); + hbaseConn = null; + } if (hbaseCluster != null) { HConnectionManager.deleteAllConnections(true); hbaseCluster.shutdown(); diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 2fefa06..fd9af9e 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -117,7 +117,7 @@ private static final Log LOG = LogFactory.getLog("QTestUtil"); private String testWarehouse; - private final String testFiles; + protected final String testFiles; protected final String outDir; protected final String logDir; private final TreeMap qMap; @@ -130,7 +130,7 @@ public static final HashSet srcTables = new HashSet(); private static MiniClusterType clusterType = MiniClusterType.none; private ParseDriver pd; - private Hive db; + protected Hive db; protected HiveConf conf; private Driver drv; private BaseSemanticAnalyzer sem; @@ -619,7 +619,7 @@ public void cleanUp() throws Exception { FunctionRegistry.unregisterTemporaryUDF("test_error"); } - private void runLoadCmd(String loadCmd) throws Exception { + protected void runLoadCmd(String loadCmd) throws Exception { int ecode = 0; ecode = drv.run(loadCmd).getResponseCode(); drv.close(); @@ -630,7 +630,7 @@ private void runLoadCmd(String loadCmd) throws Exception { return; } - private void runCreateTableCmd(String createTableCmd) throws Exception { + protected void runCreateTableCmd(String createTableCmd) throws Exception { int ecode = 0; ecode = drv.run(createTableCmd).getResponseCode(); if (ecode != 0) { @@ -641,7 +641,7 @@ private void runCreateTableCmd(String createTableCmd) throws Exception { return; } - private void runCmd(String cmd) throws Exception { + protected void runCmd(String cmd) throws Exception { int ecode = 0; ecode = drv.run(cmd).getResponseCode(); drv.close(); @@ -873,7 +873,7 @@ public String cliInit(String tname, boolean recreate) throws Exception { return outf.getAbsolutePath(); } - private CliSessionState startSessionState() + protected CliSessionState startSessionState() throws FileNotFoundException, UnsupportedEncodingException { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, diff --git a/pom.xml b/pom.xml index b5a5697..4706c5b 100644 --- a/pom.xml +++ b/pom.xml @@ -111,8 +111,8 @@ 1.2.1 2.4.0 ${basedir}/${hive.path.to.root}/testutils/hadoop - 0.96.0-hadoop1 - 0.96.0-hadoop2 + 0.98.3-hadoop1 + 0.98.3-hadoop2 4.2.5 4.2.5 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index ee074ea..44848af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4300,10 +4300,14 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { tbl.setInputFormatClass(crtTbl.getInputFormat()); tbl.setOutputFormatClass(crtTbl.getOutputFormat()); - tbl.getTTable().getSd().setInputFormat( - tbl.getInputFormatClass().getName()); - tbl.getTTable().getSd().setOutputFormat( - tbl.getOutputFormatClass().getName()); + // only persist input/ouput format to metadata when it is explicitly specified. + // Otherwise, load lazily via StorageHandler at query time. + if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) { + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()); + } + if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) { + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()); + } if (!Utilities.isDefaultNameNode(conf)) { // If location is specified - ensure that it is a full qualified name -- 1.9.0