diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 25e9cd0482..da277d058f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -191,7 +191,7 @@ public void initialize(HiveConf hiveConf) { transformations.add(new FixedBucketPruningOptimizer(compatMode)); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) { + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 4e72c4c252..4208abe0fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -28,12 +28,8 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { - private static final Logger LOG = LoggerFactory.getLogger(AbstractCorrelationProcCtx.class); private ParseContext pctx; // For queries using script, the optimization cannot be applied without user's confirmation // If script preserves alias and value for columns related to keys, user can set this true @@ -49,22 +45,7 @@ public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); - if(pctx.hasAcidWrite()) { - StringBuilder tblNames = new StringBuilder(); - for(FileSinkDesc fsd : pctx.getAcidSinks()) { - if(fsd.getTable() != null) { - tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(','); - } - } - if(tblNames.length() > 0) { - tblNames.setLength(tblNames.length() - 1);//traling ',' - } - LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 due to a write to transactional table(s) " + tblNames); - minReducer = 1; - } - else { - minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); - } + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 91bdbfd67d..ea99fe0dfc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -128,7 +128,6 @@ private Map viewProjectToViewSchema; private ColumnAccessInfo columnAccessInfo; private boolean needViewColumnAuthorization; - private Set acidFileSinks = Collections.emptySet(); private Map rsToRuntimeValuesInfo = new LinkedHashMap(); @@ -199,7 +198,7 @@ public ParseContext( AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, CreateViewDesc createViewDesc, MaterializedViewUpdateDesc materializedViewUpdateDesc, QueryProperties queryProperties, - Map viewProjectToTableSchema, Set acidFileSinks) { + Map viewProjectToTableSchema) { this.queryState = queryState; this.conf = queryState.getConf(); this.opToPartPruner = opToPartPruner; @@ -239,17 +238,8 @@ public ParseContext( // authorization info. this.columnAccessInfo = new ColumnAccessInfo(); } - if(acidFileSinks != null && !acidFileSinks.isEmpty()) { - this.acidFileSinks = new HashSet<>(); - this.acidFileSinks.addAll(acidFileSinks); - } - } - public Set getAcidSinks() { - return acidFileSinks; - } - public boolean hasAcidWrite() { - return !acidFileSinks.isEmpty(); } + /** * @return the context */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6adfb6d16c..c900d8c664 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -554,7 +554,7 @@ public ParseContext getParseContext() { opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc, - queryProperties, viewProjectToTableSchema, acidFileSinks); + queryProperties, viewProjectToTableSchema); } public CompilationOpContext getOpContext() { @@ -6856,10 +6856,15 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, } if (enforceBucketing) { + Operation acidOp = AcidUtils.isFullAcidTable(dest_tab) ? getAcidType(table_desc.getOutputFileFormatClass(), + dest, AcidUtils.isInsertOnlyTable(dest_tab)) : Operation.NOT_ACID; int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); if (conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS) > 0) { maxReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); } + if (acidOp != Operation.NOT_ACID) { + maxReducers = 1; + } int numBuckets = dest_tab.getNumBuckets(); if (numBuckets > maxReducers) { LOG.debug("numBuckets is {} and maxReducers is {}", numBuckets, maxReducers); @@ -6886,8 +6891,7 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, } input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(), maxReducers, - (AcidUtils.isFullAcidTable(dest_tab) ? getAcidType(table_desc.getOutputFileFormatClass(), - dest, AcidUtils.isInsertOnlyTable(dest_tab)) : AcidUtils.Operation.NOT_ACID)); + acidOp); reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0)); ctx.setMultiFileSpray(multiFileSpray); ctx.setNumFiles(numFiles); @@ -12542,7 +12546,7 @@ void analyzeInternal(final ASTNode astToAnalyze, Supplier pcf) t globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc, - queryProperties, viewProjectToTableSchema, acidFileSinks); + queryProperties, viewProjectToTableSchema); // Set the semijoin hints in parse context pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList())); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 1e1d65bcb5..2f3fc6c50a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -702,15 +702,12 @@ protected void runDynPartitionSortOptimizations(ParseContext parseContext, HiveC !HiveConf.getBoolVar(hConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { new SortedDynPartitionOptimizer().transform(parseContext); - if(HiveConf.getBoolVar(hConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) - || parseContext.hasAcidWrite()) { - + if(HiveConf.getBoolVar(hConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { // Dynamic sort partition adds an extra RS therefore need to de-dup new ReduceSinkDeDuplication().transform(parseContext); // there is an issue with dedup logic wherein SELECT is created with wrong columns // NonBlockingOpDeDupProc fixes that new NonBlockingOpDeDupProc().transform(parseContext); - } } } @@ -732,8 +729,7 @@ public ParseContext getParseContext(ParseContext pCtx, List> rootTasks) pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), pCtx.getCreateViewDesc(), pCtx.getMaterializedViewUpdateDesc(), - pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema(), - pCtx.getAcidSinks()); + pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema()); clone.setFetchTask(pCtx.getFetchTask()); clone.setLineageInfo(pCtx.getLineageInfo()); clone.setMapJoinOps(pCtx.getMapJoinOps()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ff815434f0..5a78ed5f9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -197,8 +197,7 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Sorted dynamic partition optimization"); } - if(HiveConf.getBoolVar(procCtx.conf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) - || procCtx.parseContext.hasAcidWrite()) { + if(HiveConf.getBoolVar(procCtx.conf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // Dynamic sort partition adds an extra RS therefore need to de-dup new ReduceSinkDeDuplication().transform(procCtx.parseContext); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 2314f49631..77d3500121 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -709,13 +710,18 @@ public static ReduceSinkDesc getReduceSinkDesc( List outputColumnNames, boolean includeKeyCols, int tag, List partitionCols, String order, String nullOrder, int numReducers, AcidUtils.Operation writeType) { - return getReduceSinkDesc(keyCols, keyCols.size(), valueCols, - new ArrayList>(), - includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) : - new ArrayList(), - includeKeyCols ? outputColumnNames.subList(keyCols.size(), - outputColumnNames.size()) : outputColumnNames, - includeKeyCols, tag, partitionCols, order, nullOrder, numReducers, writeType); + ReduceSinkDesc reduceSinkDesc = getReduceSinkDesc(keyCols, keyCols.size(), valueCols, + new ArrayList>(), + includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) : + new ArrayList(), + includeKeyCols ? outputColumnNames.subList(keyCols.size(), + outputColumnNames.size()) : outputColumnNames, + includeKeyCols, tag, partitionCols, order, nullOrder, numReducers, writeType); + if (writeType != AcidUtils.Operation.NOT_ACID) { + reduceSinkDesc.setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.FIXED)); + reduceSinkDesc.setNumReducers(1); + } + return reduceSinkDesc; } /**