Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 986523)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -239,6 +239,7 @@
HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M
MAPREDMINSPLITSIZE("mapred.min.split.size", 1),
+ HIVEMERGEMAPONLY("hive.mergejob.maponly", true),
HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),
Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 986523)
+++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy)
@@ -668,7 +668,7 @@
if (tasks != null) {
File planDir = new File(outDir, "plan");
- File planFile = new File(planDir, tname.concat(".xml"));
+ String planFile = outPath(planDir.toString(), tname + ".xml");
File outf = null;
outf = new File(logDir);
@@ -690,7 +690,7 @@
+ "\\|\\([0-9]\\{10\\}\\)"
+ "\\|\\(/.*/warehouse/.*\\)\\)";
cmdArray[4] = outf.getPath();
- cmdArray[5] = planFile.getPath();
+ cmdArray[5] = planFile;
System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2]
+ "\'" + cmdArray[3] + "\'" + " " + cmdArray[4] + " " + cmdArray[5]);
@@ -708,7 +708,7 @@
if (exitVal != 0 && overWrite) {
System.out.println("Overwriting results");
- String cmdLine = "cp " + outf.getPath() + " " + planFile.getPath();
+ String cmdLine = "cp " + outf.getPath() + " " + planFile;
executor = Runtime.getRuntime().exec(cmdLine);
exitVal = executor.waitFor();
}
@@ -887,10 +887,16 @@
}
public ASTNode parseQuery(String tname) throws Exception {
-
return pd.parse(qMap.get(tname));
}
+ public void resetParser() throws SemanticException {
+ drv.init();
+ pd = new ParseDriver();
+ sem = new SemanticAnalyzer(conf);
+ }
+
+
public List> analyzeAST(ASTNode ast) throws Exception {
// Do semantic analysis and plan generation
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -334,7 +334,7 @@
}
/**
- *
+ *
* @param tableName
* table name
* @param indexName
@@ -380,20 +380,20 @@
String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
Index old_index = null;
try {
- old_index = getIndex(dbName, tableName, indexName);
+ old_index = getIndex(dbName, tableName, indexName);
} catch (Exception e) {
}
if (old_index != null) {
throw new HiveException("Index " + indexName + " already exists on table " + tableName + ", db=" + dbName);
}
-
+
org.apache.hadoop.hive.metastore.api.Table baseTbl = getMSC().getTable(dbName, tableName);
if (baseTbl.getTableType() == TableType.VIRTUAL_VIEW.toString()) {
throw new HiveException("tableName="+ tableName +" is a VIRTUAL VIEW. Index on VIRTUAL VIEW is not supported.");
}
-
+
if (indexTblName == null) {
- indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName);
+ indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName);
} else {
org.apache.hadoop.hive.metastore.api.Table temp = null;
try {
@@ -404,11 +404,11 @@
throw new HiveException("Table name " + indexTblName + " already exists. Choose another name.");
}
}
-
+
org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().clone();
SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
if(serde != null) {
- serdeInfo.setSerializationLib(serde);
+ serdeInfo.setSerializationLib(serde);
} else {
if (storageHandler == null) {
serdeInfo.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
@@ -437,7 +437,7 @@
if (lineDelim != null) {
serdeInfo.getParameters().put(Constants.LINE_DELIM, lineDelim);
}
-
+
if (serdeProps != null) {
Iterator> iter = serdeProps.entrySet()
.iterator();
@@ -446,16 +446,16 @@
serdeInfo.getParameters().put(m.getKey(), m.getValue());
}
}
-
+
storageDescriptor.setLocation(null);
if (location != null) {
- storageDescriptor.setLocation(location);
+ storageDescriptor.setLocation(location);
}
storageDescriptor.setInputFormat(inputFormat);
storageDescriptor.setOutputFormat(outputFormat);
-
+
Map params = new HashMap();
-
+
List indexTblCols = new ArrayList();
List sortCols = new ArrayList();
storageDescriptor.setBucketCols(null);
@@ -468,14 +468,15 @@
k++;
}
}
- if (k != indexedCols.size())
+ if (k != indexedCols.size()) {
throw new RuntimeException(
"Check the index columns, they should appear in the table being indexed.");
-
+ }
+
storageDescriptor.setCols(indexTblCols);
storageDescriptor.setSortCols(sortCols);
- int time = (int) (System.currentTimeMillis() / 1000);
+ int time = (int) (System.currentTimeMillis() / 1000);
org.apache.hadoop.hive.metastore.api.Table tt = null;
HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass);
@@ -489,18 +490,18 @@
if(!deferredRebuild) {
throw new RuntimeException("Please specify deferred rebuild using \" WITH DEFERRED REBUILD \".");
}
-
+
Index indexDesc = new Index(indexName, indexHandlerClass, dbName, tableName, time, time, indexTblName,
storageDescriptor, params, deferredRebuild);
indexHandler.analyzeIndexDefinition(baseTbl, indexDesc, tt);
-
+
this.getMSC().createIndex(indexDesc, tt);
-
+
} catch (Exception e) {
throw new HiveException(e);
}
}
-
+
public Index getIndex(String dbName, String baseTableName,
String indexName) throws HiveException {
try {
@@ -509,7 +510,7 @@
throw new HiveException(e);
}
}
-
+
public boolean dropIndex(String db_name, String tbl_name, String index_name, boolean deleteData) throws HiveException {
try {
return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData);
@@ -519,7 +520,7 @@
throw new HiveException("Unknow error. Please check logs.", e);
}
}
-
+
/**
* Drops table along with the data in it. If the table doesn't exist
* then it is a no-op
@@ -812,6 +813,9 @@
FileSystem fs = loadPath.getFileSystem(conf);
FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDP, fs);
+ if (status.length == 0) {
+ LOG.warn("No partition is genereated by dynamic partitioning");
+ }
if (status.length > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
throw new HiveException("Number of dynamic partitions created is " + status.length
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy)
@@ -21,9 +21,12 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Stack;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
@@ -38,7 +41,6 @@
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -51,7 +53,9 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
@@ -65,7 +69,6 @@
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
@@ -73,6 +76,8 @@
*/
public class GenMRFileSink1 implements NodeProcessor {
+ static final private Log LOG = LogFactory.getLog(GenMRFileSink1.class.getName());
+
public GenMRFileSink1() {
}
@@ -122,7 +127,7 @@
String finalName = processFS(nd, stack, opProcCtx, chDir);
- // If it is a map-only job, insert a new task to do the concatenation
+ // need to merge the files in the destination table/partitions
if (chDir && (finalName != null)) {
createMergeJob((FileSinkOperator) nd, ctx, finalName);
}
@@ -130,8 +135,8 @@
return null;
}
- private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx,
- String finalName) throws SemanticException {
+ private void createMapReduce4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName)
+ throws SemanticException {
Task extends Serializable> currTask = ctx.getCurrTask();
RowSchema fsRS = fsOp.getSchema();
@@ -140,6 +145,7 @@
keyCols.add(TypeCheckProcFactory.DefaultExprProcessor
.getFuncExprNodeDesc("rand"));
+ // value is all the columns in the FileSink operator input
ArrayList valueCols = new ArrayList();
for (ColumnInfo ci : fsRS.getSignature()) {
valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
@@ -178,7 +184,7 @@
pos = Integer.valueOf(pos.intValue() + 1);
}
- Operator extract = OperatorFactory.getAndMakeChild(new ExtractDesc(
+ Operator extract = OperatorFactory.getAndMakeChild(new ExtractDesc(
new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
Utilities.ReduceField.VALUE.toString(), "", false)),
new RowSchema(out_rwsch.getColumnInfos()));
@@ -234,6 +240,169 @@
}
}
+ /**
+ * Create a MapReduce job for a particular partition if Hadoop version is pre 0.20,
+ * otherwise create a Map-only job using CombineHiveInputFormat for all partitions.
+ * @param fsOp The FileSink operator.
+ * @param ctx The MR processing context.
+ * @param finalName the final destination path the merge job should output.
+ * @throws SemanticException
+ */
+ private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName)
+ throws SemanticException {
+
+ // if the hadoop version support CombineFileInputFormat (version >= 0.20),
+ // create a Map-only job for merge, otherwise create a MapReduce merge job.
+ ParseContext parseCtx = ctx.getParseCtx();
+ if (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
+ Utilities.supportCombineFileInputFormat()) {
+ // create Map-only merge job
+ createMap4Merge(fsOp, ctx, finalName);
+ LOG.info("use CombineHiveInputformat for the merge job");
+ } else {
+ createMapReduce4Merge(fsOp, ctx, finalName);
+ LOG.info("use HiveInputFormat for the merge job");
+ }
+ }
+
+ private void createMap4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) {
+
+ // create a Map-only merge job with the following operators:
+ //
+ // MR job J0:
+ // ...
+ // |
+ // v
+ // FileSinkOperator_1
+ // |
+ // v
+ // Merge job J1:
+ // |
+ // v
+ // TableScan (using CombineHiveInputFormat)
+ // |
+ // v
+ // FileSinkOperator
+ //
+ // Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths do
+ // not contain the dynamic partitions (their parent). So after the dynamic partitions are
+ // created (after the first job finished before the moveTask or ConditionalTask start),
+ // we need to change the pathToPartitionInfo & pathToAlias to include the dynamic partition
+ // directories.
+ //
+
+ //
+ // 1. create the operator tree
+ //
+ ParseContext parseCtx = ctx.getParseCtx();
+ FileSinkDesc fsConf = fsOp.getConf();
+ // Create a TableScan operator
+ RowSchema fsRS = fsOp.getSchema();
+ Operator extends Serializable> ts_op = OperatorFactory.get(TableScanDesc.class, fsRS);
+
+ // Create a FileSink operator
+ ArrayList valueCols = new ArrayList();
+ for (ColumnInfo ci : fsRS.getSignature()) {
+ valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
+ ci.getTabAlias(), ci.getIsVirtualCol()));
+ }
+ ArrayList outputColumns = new ArrayList();
+ for (int i = 0; i < valueCols.size(); i++) {
+ outputColumns.add(SemanticAnalyzer.getColumnInternalName(i));
+ }
+
+ TableDesc ts = (TableDesc) fsConf.getTableInfo().clone();
+ /*
+ fsConf.getTableInfo().getProperties().remove(
+ org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+ */
+ FileSinkDesc fsDesc = new FileSinkDesc(finalName, ts,
+ parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
+ FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+ fsDesc, fsRS, ts_op);
+
+ // If the input FileSinkOperator is a dynamic partition enabled, the TableScanOperator
+ // needs to include the partition column, and the FileSinkOperator_2 should have
+ // a DynamicPartitionContext to indicate it needs to dynamically partitioned.
+ // NOTE: this changes the RowSchema of the TableScanOperator so this has to be
+ // done after the FileSinkOperator is created.
+ DynamicPartitionCtx dpCtx = fsConf.getDynPartCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ // adding dp column inf
+ ArrayList signature = fsRS.getSignature();
+ String tblAlias = fsConf.getTableInfo().getTableName();
+ LinkedHashMap colMap = new LinkedHashMap();
+ for (String dpCol: dpCtx.getDPColNames()) {
+ ColumnInfo colInfo = new ColumnInfo(dpCol, TypeInfoFactory.stringTypeInfo,
+ tblAlias, true);
+ signature.add(colInfo);
+ colMap.put(dpCol, dpCol);
+ }
+ fsRS.setSignature(signature);
+ //
+ DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+ dpCtx2.setInputToDPCols(colMap);
+ fsDesc.setDynPartCtx(dpCtx2);
+ }
+
+ //
+ // 2. create the merge job J1 and make the TableScan operator as the root of J1
+ //
+ MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
+ ArrayList aliases = new ArrayList();
+ aliases.add(fsConf.getDirName());
+ // using CombineHiveInputFormat
+ cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+ cplan.getPathToAliases().put(fsConf.getDirName(), aliases);
+ cplan.getPathToPartitionInfo().put(fsConf.getDirName(),
+ new PartitionDesc(fsConf.getTableInfo(), null));
+ cplan.setNumReduceTasks(Integer.valueOf(0)); // no reducers
+ cplan.getAliasToWork().put(fsConf.getDirName(), ts_op);
+
+ //
+ // 3. create a conditional task J2 making J1 as one of the list of conditional tasks.
+ //
+ Task extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx .getConf());
+ MoveWork dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false);
+ Task extends Serializable> dummyMoveTask = TaskFactory.get(dummyMv, ctx.getConf());
+ List listWorks = new ArrayList();
+ listWorks.add(dummyMv);
+ listWorks.add(mergeTask.getWork());
+ ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, ctx
+ .getConf());
+ List> listTasks = new ArrayList>();
+ listTasks.add(dummyMoveTask);
+ listTasks.add(mergeTask);
+ cndTsk.setListTasks(listTasks);
+
+ cndTsk.setResolver(new ConditionalResolverMergeFiles());
+ ConditionalResolverMergeFilesCtx mrCtx = new ConditionalResolverMergeFilesCtx(listTasks, fsOp
+ .getConf().getDirName());
+ mrCtx.setDPCtx(fsConf.getDynPartCtx()); // remember whether it is dynamic partition or not
+ cndTsk.setResolverCtx(mrCtx);
+
+ //
+ // 4. make the conditional task J2 as the child of the current task
+ //
+ Task extends Serializable> currTask = ctx.getCurrTask();
+ currTask.addDependentTask(cndTsk);
+
+ //
+ // 5. add the moveTask as the children of the conditional task
+ //
+ List> mvTasks = ctx.getMvTask();
+ Task extends Serializable> mvTask = findMoveTask(mvTasks, newOutput);
+
+ if (mvTask != null) {
+ for (Task extends Serializable> tsk : cndTsk.getListTasks()) {
+ tsk.addDependentTask(mvTask);
+ }
+ }
+ }
+
private Task extends Serializable> findMoveTask(
List> mvTasks, FileSinkOperator fsOp) {
// find the move task
@@ -251,10 +420,19 @@
return mvTsk;
}
}
-
return null;
}
+ /**
+ * Process the FileSink operator to generate a MoveTask if necessary.
+ * @param nd current FileSink operator
+ * @param stack parent operators
+ * @param opProcCtx
+ * @param chDir whether the operator should be first output to a tmp dir and then merged
+ * to the final dir later
+ * @return the final file name to which the FileSinkOperator should store.
+ * @throws SemanticException
+ */
private String processFS(Node nd, Stack stack,
NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy)
@@ -27,8 +27,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -151,6 +151,17 @@
}
/**
+ * The default dependent tasks are just child tasks, but different types
+ * could implement their own (e.g. ConditionalTask will use the listTasks
+ * as dependents).
+ *
+ * @return a list of tasks that are dependent on this task.
+ */
+ public List> getDependentTasks() {
+ return getChildTasks();
+ }
+
+ /**
* Add a dependent task on the current task. Return if the dependency already
* existed or is this a new one
*
@@ -297,8 +308,9 @@
public final void localizeMRTmpFiles(Context ctx) {
localizeMRTmpFilesImpl(ctx);
- if (childTasks == null)
+ if (childTasks == null) {
return;
+ }
for (Task extends Serializable> t: childTasks) {
t.localizeMRTmpFiles(ctx);
@@ -306,4 +318,3 @@
}
}
-
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (working copy)
@@ -203,8 +203,15 @@
@Override
protected void localizeMRTmpFilesImpl(Context ctx) {
- if (getListTasks() != null)
- for(Task extends Serializable> t: getListTasks())
+ if (getListTasks() != null) {
+ for(Task extends Serializable> t: getListTasks()) {
t.localizeMRTmpFiles(ctx);
+ }
+ }
}
+
+ @Override
+ public List> getDependentTasks() {
+ return listTasks;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (working copy)
@@ -43,4 +43,14 @@
public ArrayList getSignature() {
return signature;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder('(');
+ for (ColumnInfo col: signature) {
+ sb.append(col.toString());
+ }
+ sb.append(')');
+ return sb.toString();
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -89,6 +89,7 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -1416,13 +1417,13 @@
mrTasks.add((ExecDriver)task);
}
- if (task instanceof ConditionalTask) {
- getMRTasks(((ConditionalTask)task).getListTasks(), mrTasks);
+ if (task.getDependentTasks() != null) {
+ getMRTasks(task.getDependentTasks(), mrTasks);
}
-
- if (task.getChildTasks() != null) {
- getMRTasks(task.getChildTasks(), mrTasks);
- }
}
}
+
+ public static boolean supportCombineFileInputFormat() {
+ return ShimLoader.getHadoopShims().getCombineFileInputFormat() != null;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (working copy)
@@ -21,13 +21,17 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
/**
* Conditional task resolution interface. This is invoked at run time to get the
@@ -48,6 +52,7 @@
private static final long serialVersionUID = 1L;
List> listTasks;
private String dir;
+ private DynamicPartitionCtx dpCtx; // merge task could be after dynamic partition insert
public ConditionalResolverMergeFilesCtx() {
}
@@ -90,6 +95,14 @@
public void setListTasks(List> listTasks) {
this.listTasks = listTasks;
}
+
+ public DynamicPartitionCtx getDPCtx() {
+ return dpCtx;
+ }
+
+ public void setDPCtx(DynamicPartitionCtx dp) {
+ dpCtx = dp;
+ }
}
public List> getTasks(HiveConf conf,
@@ -119,15 +132,56 @@
long currAvgSz = totalSz / fStats.length;
if ((currAvgSz < avgConditionSize) && (fStats.length > 1)) {
+ //
+ // for each dynamic partition, generate a merge task
+ // populate aliasToWork, pathToPartitionInfo, pathToAlias
// also set the number of reducers
+ //
Task extends Serializable> tsk = ctx.getListTasks().get(1);
MapredWork work = (MapredWork) tsk.getWork();
- int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
- int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
- work.setNumReduceTasks(reducers);
+
+ // Dynamic partition: replace input path (root to dp paths) with dynamic partition
+ // input paths.
+ DynamicPartitionCtx dpCtx = ctx.getDPCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ FileStatus[] status = Utilities.getFileStatusRecurse(dirPath,
+ dpCtx.getNumDPCols(), inpFs);
+
+ // cleanup pathToPartitionInfo
+ Map ptpi = work.getPathToPartitionInfo();
+ assert ptpi.size() == 1;
+ String path = ptpi.keySet().iterator().next();
+ TableDesc tblDesc = ptpi.get(path).getTableDesc();
+ ptpi.remove(path); // the root path is not useful anymore
+
+ // cleanup pathToAliases
+ Map> pta = work.getPathToAliases();
+ assert pta.size() == 1;
+ path = pta.keySet().iterator().next();
+ ArrayList aliases = pta.get(path);
+ pta.remove(path); // the root path is not useful anymore
+
+ // populate pathToPartitionInfo and pathToAliases w/ DP paths
+ for (int i = 0; i < status.length; ++i) {
+ work.getPathToAliases().put(status[i].getPath().toString(), aliases);
+ // get the full partition spec from the path and update the PartitionDesc
+ Map fullPartSpec = new LinkedHashMap(
+ dpCtx.getPartSpec());
+ Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath());
+ PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec);
+ work.getPathToPartitionInfo().put(
+ status[i].getPath().toString(),
+ pDesc);
+ }
+ } else {
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+ int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+ work.setNumReduceTasks(reducers);
+ }
+
resTsks.add(tsk);
return resTsks;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (working copy)
@@ -78,6 +78,20 @@
}
}
+ public DynamicPartitionCtx(DynamicPartitionCtx dp) {
+ this.partSpec = dp.partSpec;
+ this.numDPCols = dp.numDPCols;
+ this.numSPCols = dp.numSPCols;
+ this.spPath = dp.spPath;
+ this.rootPath = dp.rootPath;
+ this.numBuckets = dp.numBuckets;
+ this.inputToDPCols = dp.inputToDPCols;
+ this.spNames = dp.spNames;
+ this.dpNames = dp.dpNames;
+ this.defaultPartName = dp.defaultPartName;
+ this.maxPartsPerNode = dp.maxPartsPerNode;
+ }
+
public void mapInputToDP(List fs) {
assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 986523)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -32,9 +32,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
-import java.util.Map.Entry;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -94,6 +94,7 @@
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
@@ -103,7 +104,6 @@
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -123,6 +123,7 @@
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
@@ -144,12 +145,11 @@
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -157,9 +157,9 @@
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -3273,10 +3273,12 @@
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
- // TODO: we should support merge files for dynamically generated partitions later
if (dpCtx.getNumDPCols() > 0 &&
(HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) ||
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES))) {
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) &&
+ Utilities.supportCombineFileInputFormat() == false) {
+ // Do not support merge for Hadoop versions (pre-0.20) that do not
+ // support CombineHiveInputFormat
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
}
@@ -6043,12 +6045,12 @@
private void getLeafTasks(Task extends Serializable> task,
HashSet> leaves) {
- if (task.getChildTasks() == null) {
+ if (task.getDependentTasks() == null) {
if (!leaves.contains(task)) {
leaves.add(task);
}
} else {
- getLeafTasks(task.getChildTasks(), leaves);
+ getLeafTasks(task.getDependentTasks(), leaves);
}
}