diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 0524572..4cbfa2e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.hbase; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -38,6 +39,13 @@ * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. */ class HiveHBaseInputFormatUtil { + /** + * Parse {@code jobConf} to create the target {@link TableName} instance. + */ + public static TableName getTableName(JobConf jobConf) throws IOException { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + return TableName.valueOf(Bytes.toBytes(hbaseTableName)); + } /** * Parse {@code jobConf} to create the target {@link HTable} instance. diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 8e72759..9a70da9 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -75,6 +75,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; + + /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler * tables, decorating an underlying HBase TableInputFormat with extra Hive logic @@ -95,7 +99,9 @@ HBaseSplit hbaseSplit = (HBaseSplit) split; TableSplit tableSplit = hbaseSplit.getTableSplit(); - setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); + final Connection conn = ConnectionFactory.createConnection(jobConf); + + initializeTable(conn, HiveHBaseInputFormatUtil.getTableName(jobConf)); setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); Job job = new Job(jobConf); @@ -115,6 +121,7 @@ @Override public void close() throws IOException { recordReader.close(); + conn.close(); } @Override @@ -429,19 +436,25 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { synchronized (hbaseTableMonitor) { - return getSplitsInternal(jobConf, numSplits); + final Connection conn = ConnectionFactory.createConnection(jobConf); + try { + return getSplitsInternal(conn, jobConf, numSplits); + } finally { + conn.close(); + } } } - private InputSplit[] getSplitsInternal(JobConf jobConf, int numSplits) throws IOException { + private InputSplit[] getSplitsInternal(Connection conn, JobConf jobConf, int numSplits) throws IOException { //obtain delegation tokens for the job if (UserGroupInformation.getCurrentUser().hasKerberosCredentials()) { TableMapReduceUtil.initCredentials(jobConf); } + initializeTable(conn, HiveHBaseInputFormatUtil.getTableName(jobConf)); + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);