diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 55c71dd..bf9a0a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -195,7 +195,7 @@ public void initialize(HiveConf hiveConf) { transformations.add(new FixedBucketPruningOptimizer(compatMode)); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index adfbb67..cf5ac33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -144,6 +144,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + if(AcidUtils.isAcidTable(destTable)) { + //ACID tables require their own sort order + LOG.info("Bailing out of sort dynamic partition optimization as destination table is 'transactional'"); + return null; + } + // unlink connection between FS and its parent Operator fsParent = fsOp.getParentOperators().get(0); // if all dp columns got constant folded then disable this optimization diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 5b673df..bb4ec55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -28,8 +28,12 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCorrelationProcCtx.class); private ParseContext pctx; // For queries using script, the optimization cannot be applied without user's confirmation // If script preserves alias and value for columns related to keys, user can set this true @@ -45,7 +49,22 @@ public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); - minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + if(pctx.hasAcidWrite()) { + StringBuilder tblNames = new StringBuilder(); + for(FileSinkDesc fsd : pctx.getAcidSinks()) { + if(fsd.getTable() != null) { + tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(','); + } + } + if(tblNames.length() > 0) { + tblNames.setLength(tblNames.length() - 1);//traling ',' + } + LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 due to a write to transactional table(s) " + tblNames); + minReducer = 1; + } + else { + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + } isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 1bccf20..700bc74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -113,6 +115,8 @@ private Map viewProjectToViewSchema; private ColumnAccessInfo columnAccessInfo; private boolean needViewColumnAuthorization; + private Set acidFileSinks = Collections.emptySet(); + public ParseContext() { } @@ -172,7 +176,8 @@ public ParseContext( Map viewAliasToInput, List reduceSinkOperatorsAddedByEnforceBucketingSorting, AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, - QueryProperties queryProperties, Map viewProjectToTableSchema) { + QueryProperties queryProperties, Map viewProjectToTableSchema, + Set acidFileSinks) { this.conf = conf; this.opToPartPruner = opToPartPruner; this.opToPartList = opToPartList; @@ -208,8 +213,17 @@ public ParseContext( // authorization info. this.columnAccessInfo = new ColumnAccessInfo(); } + if(acidFileSinks != null && !acidFileSinks.isEmpty()) { + this.acidFileSinks = new HashSet<>(); + this.acidFileSinks.addAll(acidFileSinks); + } + } + public Set getAcidSinks() { + return acidFileSinks; + } + public boolean hasAcidWrite() { + return !acidFileSinks.isEmpty(); } - /** * @return the context */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 96df189..b787f3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -447,7 +447,7 @@ public ParseContext getParseContext() { listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); } public CompilationOpContext getOpContext() { @@ -7004,11 +7004,17 @@ private void checkAcidConstraints(QB qb, TableDesc tableDesc, LOG.debug("Couldn't find table " + tableName + " in insertIntoTable"); throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg()); } + /* LOG.info("Modifying config values for ACID write"); conf.setBoolVar(ConfVars.HIVEOPTREDUCEDEDUPLICATION, true); conf.setIntVar(ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER, 1); - conf.set(AcidUtils.CONF_ACID_KEY, "true"); conf.setBoolVar(ConfVars.HIVEOPTSORTDYNAMICPARTITION, false); + These props are now enabled elsewhere (see commit diffs). It would be better instead to throw + if they are not set. For exmaple, if user has set hive.optimize.reducededuplication=false for + some reason, we'll run a query contrary to what they wanted... But throwing now would be + backwards incompatible. + */ + conf.set(AcidUtils.CONF_ACID_KEY, "true"); if (table.getNumBuckets() < 1) { throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName()); @@ -10666,7 +10672,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); // 5. Take care of view creation if (createVwDesc != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index f7d7a40..2468f03 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -403,7 +403,8 @@ public ParseContext getParseContext(ParseContext pCtx, List