diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 4ef683a..64b972d 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -43,7 +44,6 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; /** @@ -276,6 +277,16 @@ public class HBaseStorageHandler extends DefaultStorageHandler } } jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName); + + try { + //Create dummy job to pass to hbase + Job job = new Job(getConf()); + TableMapReduceUtil.initCredentials(job); + + tableDesc.setJobCredentials(job.getCredentials()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } } @Override diff --git ivy/libraries.properties ivy/libraries.properties index ac54762..ab793df 100644 --- ivy/libraries.properties +++ ivy/libraries.properties @@ -38,8 +38,8 @@ commons-logging-api.version=1.0.4 commons-pool.version=1.5.4 derby.version=10.4.2.0 guava.version=r06 -hbase.version=0.92.0-SNAPSHOT -hbase-test.version=0.92.0-SNAPSHOT +hbase.version=0.92.0 +hbase-test.version=0.92.0 javaewah.version=0.3.2 jdo-api.version=2.3-ec jdom.version=1.1 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index 0db2a71..dfbce15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -188,7 +188,8 @@ public class ExecDriver extends Task implements Serializable, Hadoop return false; } - protected void createTmpDirs() throws IOException { + /** Creates tmp directories, and configures the job for output*/ + protected void prepareJobOutput(JobConf job) throws IOException { // fix up outputs Map> pa = work.getPathToAliases(); if (pa != null) { @@ -215,6 +216,8 @@ public class ExecDriver extends Task implements Serializable, Hadoop FileSystem fs = tempPath.getFileSystem(job); fs.mkdirs(tempPath); } + + Utilities.copyTableJobPropertiesToConf(fdesc.getTableInfo(), job); } if (op.getChildOperators() != null) { @@ -226,6 +229,26 @@ public class ExecDriver extends Task implements Serializable, Hadoop } } + protected void copyTableJobProperties(JobConf job) { + TableDesc keyDesc = work.getKeyDesc(); + if (keyDesc != null) { + Utilities.copyTableJobPropertiesToConf(keyDesc, job); + } + if( work.getTagToValueDesc() != null) { + for(TableDesc tDesc : work.getTagToValueDesc()) { + Utilities.copyTableJobPropertiesToConf(tDesc, job); + } + } + if (work.getPathToPartitionInfo() != null) { + for (PartitionDesc part : work.getPathToPartitionInfo().values() ) { + TableDesc tDesc = part.getTableDesc(); + if (tDesc != null) { + Utilities.copyTableJobPropertiesToConf(tDesc, job); + } + } + } + } + /** * Execute a query plan using Hadoop. */ @@ -327,6 +350,9 @@ public class ExecDriver extends Task implements Serializable, Hadoop job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); + //add any additional job configuration from Storage Handlers + copyTableJobProperties(job); + // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands // it String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS); @@ -426,7 +452,7 @@ public class ExecDriver extends Task implements Serializable, Hadoop } } - this.createTmpDirs(); + this.prepareJobOutput(job); // Finally SUBMIT the JOB! rj = jc.submitJob(job); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2ae6bd0..621fe47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -144,6 +144,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.ReflectionUtils; /** @@ -1638,6 +1639,11 @@ public final class Utilities { for (Map.Entry entry : jobProperties.entrySet()) { job.set(entry.getKey(), entry.getValue()); } + + Credentials credentials = tbl.getJobCredentials(); + if (credentials != null) { + job.getCredentials().addAll(credentials); + } } public static Object getInputSummaryLock = new Object(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index f66f437..24e1446 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -20,14 +20,15 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import java.util.Enumeration; -import java.util.Properties; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.security.Credentials; /** * TableDesc. @@ -41,6 +42,7 @@ public class TableDesc implements Serializable, Cloneable { private java.util.Properties properties; private String serdeClassName; private Map jobProperties; + private Credentials jobCredentials; public TableDesc() { } @@ -107,6 +109,14 @@ public class TableDesc implements Serializable, Cloneable { this.jobProperties = jobProperties; } + public Credentials getJobCredentials() { + return jobCredentials; + } + + public void setJobCredentials(Credentials jobCredentials) { + this.jobCredentials = jobCredentials; + } + @Explain(displayName = "jobProperties", normalExplain = false) public Map getJobProperties() { return jobProperties; @@ -149,7 +159,7 @@ public class TableDesc implements Serializable, Cloneable { org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE) != null); } - + @Override public Object clone() { TableDesc ret = new TableDesc();