diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index cc1dbb5..77ff824 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -268,6 +269,7 @@ public void configureOutputJobProperties( configureTableJobProperties(tableDesc, jobProperties); } + @Override public void configureTableJobProperties( TableDesc tableDesc, Map jobProperties) { @@ -293,6 +295,17 @@ public void configureTableJobProperties( } @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + try { + TableMapReduceUtil.addDependencyJars(jobConf); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf, + HBaseStorageHandler.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override public DecomposedPredicate decomposePredicate( JobConf jobConf, Deserializer deserializer, diff --git hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java index 7d71dd6..2f00ba2 100644 --- hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.mapred.OutputFormat; @@ -32,7 +32,7 @@ * The abstract Class HCatStorageHandler would server as the base class for all * the storage handlers required for non-native tables in HCatalog. */ -public abstract class HCatStorageHandler implements HiveStorageHandler { +public abstract class HCatStorageHandler extends DefaultStorageHandler { //TODO move this to HiveStorageHandler diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index 2ad6227..4da68a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -412,7 +412,7 @@ public int execute(DriverContext driverContext) { LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri()); } } - + work.configureJobConf(job); addInputPaths(job, work, emptyScratchDirStr, ctx); Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java new file mode 100644 index 0000000..00e4969 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +public class OperatorUtils { + + public static Set findOperators(Operator start, Class clazz) { + return findOperator(start, clazz, new HashSet()); + } + + public static Set findOperators(Collection> starts, Class clazz) { + Set found = new HashSet(); + for (Operator start : starts) { + findOperator(start, clazz, found); + } + return found; + } + + @SuppressWarnings("unchecked") + private static Set findOperator(Operator start, Class clazz, Set found) { + if (clazz.isInstance(start)) { + found.add((T) start); + } + if (start.getChildOperators() != null) { + for (Operator child : start.getChildOperators()) { + findOperator(child, clazz, found); + } + } + return found; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 7338631..100db8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -401,7 +401,7 @@ public static void setMapRedWork(Configuration job, MapredWork w, String hiveScr public static String getHiveJobID(Configuration job) { String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN); - if (planPath != null) { + if (planPath != null && !planPath.isEmpty()) { return (new Path(planPath)).getName(); } return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java index 800d541..83772bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -87,6 +88,11 @@ public void configureTableJobProperties(TableDesc tableDesc, } @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + //do nothing by default + } + + @Override public Configuration getConf() { return conf; } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 5c021f3..1eec32c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; /** @@ -133,4 +134,12 @@ public abstract void configureOutputJobProperties(TableDesc tableDesc, public void configureTableJobProperties( TableDesc tableDesc, Map jobProperties); + + /** + * Called just before submitting MapReduce job. + * + * @param tableDesc descriptor for the table being accessed + * @param JobConf jobConf for MapReduce job + */ + public void configureJobConf(TableDesc tableDesc, JobConf jobConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index d121f7e..c3e4f54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -29,13 +30,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.mapred.JobConf; /** * MapredWork. @@ -563,4 +567,19 @@ public boolean isFinalMapRed() { public void setFinalMapRed(boolean finalMapRed) { this.finalMapRed = finalMapRed; } + + public void configureJobConf(JobConf jobConf) { + for (PartitionDesc partition : aliasToPartnInfo.values()) { + PlanUtils.configureJobConf(partition.getTableDesc(), jobConf); + } + Collection> mappers = aliasToWork.values(); + for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, FileSinkOperator.class)) { + PlanUtils.configureJobConf(fs.getConf().getTableInfo(), jobConf); + } + if (reducer != null) { + for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) { + PlanUtils.configureJobConf(fs.getConf().getTableInfo(), jobConf); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 943d1ab..04921d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; @@ -352,8 +353,8 @@ public static TableDesc getDefaultTableDesc(String separatorCode) { return new TableDesc(MetadataTypedColumnsetSerDe.class, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities .makeProperties( - org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, - separatorCode)); + org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, + separatorCode)); } /** @@ -779,6 +780,19 @@ private static void configureJobPropertiesForStorageHandler(boolean input, } } + public static void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + String handlerClass = tableDesc.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE); + try { + HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(jobConf, handlerClass); + if (storageHandler != null) { + storageHandler.configureJobConf(tableDesc, jobConf); + } + } catch (HiveException e) { + throw new RuntimeException(e); + } + } + public static String stripQuotes(String val) { if ((val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'') || (val.charAt(0) == '\"' && val.charAt(val.length() - 1) == '\"')) {