diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 55c71dd..bf9a0a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -195,7 +195,7 @@ public void initialize(HiveConf hiveConf) { transformations.add(new FixedBucketPruningOptimizer(compatMode)); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 5b673df..bb4ec55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -28,8 +28,12 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCorrelationProcCtx.class); private ParseContext pctx; // For queries using script, the optimization cannot be applied without user's confirmation // If script preserves alias and value for columns related to keys, user can set this true @@ -45,7 +49,22 @@ public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); - minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + if(pctx.hasAcidWrite()) { + StringBuilder tblNames = new StringBuilder(); + for(FileSinkDesc fsd : pctx.getAcidSinks()) { + if(fsd.getTable() != null) { + tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(','); + } + } + if(tblNames.length() > 0) { + tblNames.setLength(tblNames.length() - 1);//traling ',' + } + LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 due to a write to transactional table(s) " + tblNames); + minReducer = 1; + } + else { + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + } isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 5d0f905..96ef20d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -115,6 +117,8 @@ private Map viewProjectToViewSchema; private ColumnAccessInfo columnAccessInfo; private boolean needViewColumnAuthorization; + private Set acidFileSinks = Collections.emptySet(); + public ParseContext() { } @@ -174,7 +178,8 @@ public ParseContext( Map viewAliasToInput, List reduceSinkOperatorsAddedByEnforceBucketingSorting, AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, - QueryProperties queryProperties, Map viewProjectToTableSchema) { + QueryProperties queryProperties, Map viewProjectToTableSchema, + Set acidFileSinks) { this.queryState = queryState; this.conf = queryState.getConf(); this.opToPartPruner = opToPartPruner; @@ -211,8 +216,17 @@ public ParseContext( // authorization info. this.columnAccessInfo = new ColumnAccessInfo(); } + if(acidFileSinks != null && !acidFileSinks.isEmpty()) { + this.acidFileSinks = new HashSet<>(); + this.acidFileSinks.addAll(acidFileSinks); + } + } + public Set getAcidSinks() { + return acidFileSinks; + } + public boolean hasAcidWrite() { + return !acidFileSinks.isEmpty(); } - /** * @return the context */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 2983d38..9cc166b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -452,7 +452,7 @@ public ParseContext getParseContext() { listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); } public CompilationOpContext getOpContext() { @@ -7026,9 +7026,15 @@ private void checkAcidConstraints(QB qb, TableDesc tableDesc, LOG.debug("Couldn't find table " + tableName + " in insertIntoTable"); throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg()); } + /* LOG.info("Modifying config values for ACID write"); conf.setBoolVar(ConfVars.HIVEOPTREDUCEDEDUPLICATION, true); conf.setIntVar(ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER, 1); + These props are now enabled elsewhere (see commit diffs). It would be better instead to throw + if they are not set. For exmaple, if user has set hive.optimize.reducededuplication=false for + some reason, we'll run a query contrary to what they wanted... But throwing now would be + backwards incompatible. + */ conf.set(AcidUtils.CONF_ACID_KEY, "true"); if (table.getNumBuckets() < 1) { @@ -10687,7 +10693,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); // 5. Take care of view creation if (createVwDesc != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7efc987..4049f40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -460,7 +460,8 @@ public ParseContext getParseContext(ParseContext pCtx, List