diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 255ffa2..46a5e3e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -29,7 +29,10 @@ import java.util.Properties; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; @@ -68,6 +72,8 @@ public class HBaseStorageHandler extends DefaultStorageHandler implements HiveMetaHook, HiveStoragePredicateHandler { + private static final Log LOG = LogFactory.getLog(HBaseStorageHandler.class); + final static public String DEFAULT_PREFIX = "default."; //Check if the configure job properties is called from input @@ -251,6 +257,11 @@ public void setConf(Configuration conf) { @Override public Class getInputFormatClass() { + if (getInputSnapshotName(jobConf) != null) { + LOG.debug("Using TableSnapshotInputFormat"); + return HiveHBaseTableSnapshotInputFormat.class; + } + LOG.debug("Using HiveHBaseTableInputFormat"); return HiveHBaseTableInputFormat.class; } @@ -269,6 +280,17 @@ public HiveMetaHook getMetaHook() { return this; } + /** + * Return the snapshot name to use as job input, null otherwise. + */ + static String getInputSnapshotName(Configuration conf) { + return conf.get("hive.hbase.snapshot"); + } + + static String getInputSnapshotRestoreDir(Configuration conf) { + return conf.get("hive.hbase.snapshot.restoredir"); + } + @Override public void configureInputJobProperties( TableDesc tableDesc, @@ -332,6 +354,28 @@ public void configureTableJobProperties( // do this for reconciling HBaseStorageHandler for use in HCatalog // check to see if this an input job or an outputjob if (this.configureInputJobProps) { + String snapshotName = getInputSnapshotName(jobConf); + if (snapshotName != null) { + try { + // TODO: automatically provide a reasonable restore path when none provided. + String restoreDir = getInputSnapshotRestoreDir(jobConf); + if (restoreDir == null) { + throw new IllegalArgumentException( + "Cannot process HBase snapshot without specifying hive.hbase.snapshot.restoredir"); + } + LOG.debug("Restoring snapshot '" + snapshotName + "' into path under '" + restoreDir + "'"); + TableSnapshotInputFormatImpl.setInput(hbaseConf, snapshotName, new Path(restoreDir)); + + // copy over additional added configs. + jobProperties.put("hbase.TableSnapshotInputFormat.snapshot.name", hbaseConf.get("hbase.TableSnapshotInputFormat.snapshot.name")); + jobConf.set("hbase.TableSnapshotInputFormat.snapshot.name", hbaseConf.get("hbase.TableSnapshotInputFormat.snapshot.name")); + jobProperties.put("hbase.TableSnapshotInputFormat.table.dir", hbaseConf.get("hbase.TableSnapshotInputFormat.table.dir")); + jobConf.set("hbase.TableSnapshotInputFormat.table.dir", hbaseConf.get("hbase.TableSnapshotInputFormat.table.dir")); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + for (String k : jobProperties.keySet()) { jobConf.set(k, jobProperties.get(k)); } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java new file mode 100644 index 0000000..5aa1d79 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. + */ +class HiveHBaseInputFormatUtil { + + /** + * Parse {@code jobConf} to create the target {@link HTable} instance. + */ + public static HTable getTable(JobConf jobConf) throws IOException { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)); + } + + /** + * Parse {@code jobConf} to create a {@link Scan} instance. + */ + public static Scan getScan(JobConf jobConf) throws IOException { + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + ColumnMappings columnMappings; + + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } + + if (columnMappings.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + + boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); + Scan scan = new Scan(); + boolean empty = true; + + // The list of families that have been added to the scan + List addedFamilies = new ArrayList(); + + if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i : readColIDs) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + addedFamilies.add(colMap.familyName); + } else { + if(!addedFamilies.contains(colMap.familyName)){ + // add only if the corresponding family has not already been added + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + } + + empty = false; + } + } + + // The HBase table's row key maps to a Hive table column. In the corner case when only the + // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ + // column qualifier will have been added to the scan. We arbitrarily add at least one column + // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive + // tables column projection. + if (empty) { + for (ColumnMapping colMap: columnMappings) { + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + } else { + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + + if (!readAllColumns) { + break; + } + } + } + + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + return scan; + } + + public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ + + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat); + + switch (mapInfo.length) { + case 1: + return tblLevelDefault; + + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; + } + + default: + throw new IOException("Malformed string: " + spec); + } + } +} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 1032cc9..a252906 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -89,89 +88,10 @@ HBaseSplit hbaseSplit = (HBaseSplit) split; TableSplit tableSplit = hbaseSplit.getSplit(); - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - ColumnMappings columnMappings; - - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } - - if (columnMappings.size() < readColIDs.size()) { - throw new IOException("Cannot read more columns than the given table contains."); - } - - boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - Scan scan = new Scan(); - boolean empty = true; - - // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); - - if (!readAllColumns) { - ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); - for (int i : readColIDs) { - ColumnMapping colMap = columnsMapping[i]; - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add only if the corresponding family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - } - - empty = false; - } - } - - // The HBase table's row key maps to a Hive table column. In the corner case when only the - // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ - // column qualifier will have been added to the scan. We arbitrarily add at least one column - // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive - // tables column projection. - if (empty) { - for (ColumnMapping colMap: columnMappings) { - if (colMap.hbaseRowKey) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - } else { - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - if (!readAllColumns) { - break; - } - } - } - - String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); - if (scanCache != null) { - scan.setCaching(Integer.valueOf(scanCache)); - } - String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); - if (scanCacheBlocks != null) { - scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); - } - String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); - if (scanBatch != null) { - scan.setBatch(Integer.valueOf(scanBatch)); - } + setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); + setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); - setScan(scan); Job job = new Job(jobConf); TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( job.getConfiguration(), reporter); @@ -443,12 +363,12 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); if (hbaseColumnsMapping == null) { - throw new IOException("hbase.columns.mapping required for HBase Table."); + throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); } ColumnMappings columnMappings = null; try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } @@ -463,10 +383,9 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). Scan scan = createFilterScan(jobConf, iKey, - getStorageFormatOfKey(keyMapping.mappingSpec, + HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); - // The list of families that have been added to the scan List addedFamilies = new ArrayList(); @@ -503,28 +422,4 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( return results; } - - private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ - - String[] mapInfo = spec.split("#"); - boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; - - switch (mapInfo.length) { - case 1: - return tblLevelDefault; - - case 2: - String storageType = mapInfo[1]; - if(storageType.equals("-")) { - return tblLevelDefault; - } else if ("string".startsWith(storageType)){ - return false; - } else if ("binary".startsWith(storageType)){ - return true; - } - - default: - throw new IOException("Malformed string: " + spec); - } - } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java new file mode 100644 index 0000000..3232c00 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableInputFormat; +import org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; + +public class HiveHBaseTableSnapshotInputFormat + implements InputFormat { + + TableSnapshotInputFormat delegate = new TableSnapshotInputFormat(); + + private static void setColumns(JobConf job) throws IOException { + // hbase mapred API doesn't support scan at the moment. + Scan scan = HiveHBaseInputFormatUtil.getScan(job); + byte[][] families = scan.getFamilies(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < families.length; i++) { + if (i > 0) sb.append(" "); + sb.append(Bytes.toString(families[i])); + } + job.set(TableInputFormat.COLUMN_LIST, sb.toString()); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + setColumns(job); + return delegate.getSplits(job, numSplits); + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + setColumns(job); + final RecordReader rr = + delegate.getRecordReader(split, job, reporter); + + return new RecordReader() { + @Override + public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException { + return rr.next(key, value.getResult()); + } + + @Override + public ImmutableBytesWritable createKey() { + return rr.createKey(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(rr.createValue()); + } + + @Override + public long getPos() throws IOException { + return rr.getPos(); + } + + @Override + public void close() throws IOException { + rr.close(); + } + + @Override + public float getProgress() throws IOException { + return rr.getProgress(); + } + }; + } +} diff --git a/pom.xml b/pom.xml index 7b54fe5..fc0a291 100644 --- a/pom.xml +++ b/pom.xml @@ -108,8 +108,8 @@ 1.2.1 2.4.0 ${basedir}/${hive.path.to.root}/testutils/hadoop - 0.96.0-hadoop1 - 0.96.0-hadoop2 + 0.98.3-hadoop1 + 0.98.3-hadoop2 4.2.5 4.2.5 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 7c175aa..80eb3c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4210,10 +4210,14 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { tbl.setInputFormatClass(crtTbl.getInputFormat()); tbl.setOutputFormatClass(crtTbl.getOutputFormat()); - tbl.getTTable().getSd().setInputFormat( - tbl.getInputFormatClass().getName()); - tbl.getTTable().getSd().setOutputFormat( - tbl.getOutputFormatClass().getName()); + // only persist input/ouput format to metadata when it is explicitly specified. + // Otherwise, load lazily via StorageHandler at query time. + if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) { + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()); + } + if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) { + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()); + } if (!Utilities.isDefaultNameNode(conf)) { // If location is specified - ensure that it is a full qualified name