diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2604d5d..39944a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -493,24 +493,7 @@ private void dpSetup() { assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has " + inputObjInspectors.length; StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; - // remove the last dpMapping.size() columns from the OI - List fieldOI = soi.getAllStructFieldRefs(); - ArrayList newFieldsOI = new ArrayList(); - ArrayList newFieldsName = new ArrayList(); - this.dpStartCol = 0; - for (StructField sf : fieldOI) { - String fn = sf.getFieldName(); - if (!dpCtx.getInputToDPCols().containsKey(fn)) { - newFieldsOI.add(sf.getFieldObjectInspector()); - newFieldsName.add(sf.getFieldName()); - this.dpStartCol++; - } else { - // once we found the start column for partition column we are done - break; - } - } - assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; - + this.dpStartCol = Utilities.getDPColOffset(conf); this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol); this.dpVals = new ArrayList(numDynParts); this.dpWritables = new ArrayList(numDynParts); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index bcf85a4..5b21af9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -3916,4 +3917,20 @@ public static void stripHivePasswordDetails(Configuration conf) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, ""); } } + + public static int getDPColOffset(FileSinkDesc conf) { + + if (conf.getWriteType() == AcidUtils.Operation.DELETE) { + // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return 1; + } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { + // For updates, ROW__ID is an extra column at index 0. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return getColumnNames(conf.getTableInfo().getProperties()).size() + 1; + } else { + return getColumnNames(conf.getTableInfo().getProperties()).size(); + } + + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index 5c6a6df..25156b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -843,7 +843,7 @@ private static ExprNodeDesc evaluateFunction(GenericUDF udf, List } } if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) { - // nested complex types cannot be folded cleanly + // nested complex types cannot be folded cleanly return null; } Object value = constant.getValue(); @@ -1163,16 +1163,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx(); if (dpCtx != null) { - // If all dynamic partitions are propagated as constant, remove DP. - Set inputs = dpCtx.getInputToDPCols().keySet(); - // Assume only 1 parent for FS operator Operator parent = op.getParentOperators().get(0); Map parentConstants = cppCtx.getPropagatedConstants(parent); RowSchema rs = parent.getSchema(); boolean allConstant = true; - for (String input : inputs) { - ColumnInfo ci = rs.getColumnInfo(input); + int dpColStartIdx = Utilities.getDPColOffset(fsdesc); + List colInfos = rs.getSignature(); + for (int i = dpColStartIdx; i < colInfos.size(); i++) { + ColumnInfo ci = colInfos.get(i); if (parentConstants.get(ci) == null) { allConstant = false; break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 02fbdfe..c696fd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -497,9 +497,6 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set signature = inputRS.getSignature(); String tblAlias = fsInputDesc.getTableInfo().getTableName(); - LinkedHashMap colMap = new LinkedHashMap(); for (String dpCol : dpCtx.getDPColNames()) { ColumnInfo colInfo = new ColumnInfo(dpCol, TypeInfoFactory.stringTypeInfo, // all partition column type should be string tblAlias, true); // partition column is virtual column signature.add(colInfo); - colMap.put(dpCol, dpCol); // input and output have the same column name } inputRS.setSignature(signature); // create another DynamicPartitionCtx, which has a different input-to-DP column mapping DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); - dpCtx2.setInputToDPCols(colMap); fsOutputDesc.setDynPartCtx(dpCtx2); // update the FileSinkOperator to include partition columns @@ -1896,7 +1890,7 @@ static void usePartitionColumns(Properties properties, List partColNames "Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, " + Arrays.toString(partTypes)); - Map typeMap = new HashMap(); + Map typeMap = new HashMap<>(); for (int i = 0; i < partNames.length; i++) { String previousValue = typeMap.put(partNames[i], partTypes[i]); Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c5f39d3..d01a80a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -736,7 +736,7 @@ private ASTNode genValuesTempTable(ASTNode originalFrom, QB qb) throws SemanticE Path dataDir = null; if(!qb.getEncryptedTargetTablePaths().isEmpty()) { //currently only Insert into T values(...) is supported thus only 1 values clause - //and only 1 target table are possible. If/when support for + //and only 1 target table are possible. If/when support for //select ... from values(...) is added an insert statement may have multiple //encrypted target tables. dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri()); @@ -1556,7 +1556,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException for (String alias : tabAliases) { String tab_name = qb.getTabNameForAlias(alias); - + // we first look for this alias from CTE, and then from catalog. /* * if this s a CTE reference: Add its AST as a SubQuery to this QB. @@ -6830,30 +6830,6 @@ Operator genConversionSelectOperator(String dest, QB qb, Operator input, .getColumnInfos()), input), rowResolver); input.setColumnExprMap(colExprMap); } - - rowFields = opParseCtx.get(input).getRowResolver() - .getColumnInfos(); - if (deleting()) { - // Figure out if we have partition columns in the list or not. If so, - // add them into the mapping. Partition columns will be located after the row id. - if (rowFields.size() > 1) { - // This means we have partition columns to deal with, so set up the mapping from the - // input to the partition columns. - dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size())); - } - } else if (updating()) { - // In this case we expect the number of in fields to exceed the number of out fields by one - // (for the ROW__ID virtual column). If there are more columns than this, - // then the extras are for dynamic partitioning - if (dynPart && dpCtx != null) { - dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size())); - } - } else { - if (dynPart && dpCtx != null) { - // create the mapping from input ExprNode to dest table DP column - dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size())); - } - } return input; } @@ -10105,7 +10081,7 @@ private void preProcessForInsert(ASTNode node, QB qb) throws SemanticException { return; } for (Node child : node.getChildren()) { - //each insert of multi insert looks like + //each insert of multi insert looks like //(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1))) if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) { continue; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 24db7d0..95d5635 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,14 +19,11 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.Table; public class DynamicPartitionCtx implements Serializable { @@ -43,8 +40,6 @@ private Path rootPath; // the root path DP columns paths start from private int numBuckets; // number of buckets in each partition - private Map inputToDPCols; // mapping from input column names to DP columns - private List spNames; // sp column names private List dpNames; // dp column names private String defaultPartName; // default partition name in case of null or empty value @@ -71,7 +66,6 @@ public DynamicPartitionCtx(Table tbl, Map partSpec, String defau } this.numDPCols = dpNames.size(); this.numSPCols = spNames.size(); - this.inputToDPCols = new HashMap(); if (this.numSPCols > 0) { this.spPath = Warehouse.makeDynamicPartName(partSpec); } else { @@ -86,25 +80,12 @@ public DynamicPartitionCtx(DynamicPartitionCtx dp) { this.spPath = dp.spPath; this.rootPath = dp.rootPath; this.numBuckets = dp.numBuckets; - this.inputToDPCols = dp.inputToDPCols; this.spNames = dp.spNames; this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; } - public void mapInputToDP(List fs) { - - assert fs.size() == this.numDPCols: "input DP column size != numDPCols"; - - Iterator itr1 = fs.iterator(); - Iterator itr2 = dpNames.iterator(); - - while (itr1.hasNext() && itr2.hasNext()) { - inputToDPCols.put(itr1.next().getInternalName(), itr2.next()); - } - } - public int getMaxPartitionsPerNode() { return this.maxPartsPerNode; } @@ -161,14 +142,6 @@ public void setSPColNames(List sp) { this.spNames = sp; } - public Map getInputToDPCols() { - return this.inputToDPCols; - } - - public void setInputToDPCols(Map map) { - this.inputToDPCols = map; - } - public void setNumDPCols(int dp) { this.numDPCols = dp; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index c6ae030..c6fc915 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -77,7 +76,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -95,7 +93,6 @@ private static TableDesc acidTableDescriptor; private static ObjectInspector inspector; private static List rows; - private static ValidTxnList txnList; private Path basePath; private JobConf jc; @@ -113,7 +110,6 @@ public static void classSetup() { "testFileSinkOperator"); tmpdir.mkdir(); tmpdir.deleteOnExit(); - txnList = new ValidReadTxnList(new long[]{}, 2); } @Test @@ -300,9 +296,6 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, Map partColMap= new LinkedHashMap(1); partColMap.put(PARTCOL_NAME, null); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); - Map partColNames = new HashMap(1); - partColNames.put(PARTCOL_NAME, PARTCOL_NAME); - dpCtx.setInputToDPCols(partColNames); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); } else { @@ -588,7 +581,6 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList public RecordUpdater getRecordUpdater(final Path path, final Options options) throws IOException { - final StructObjectInspector inspector = (StructObjectInspector)options.getInspector(); return new RecordUpdater() { @Override public void insert(long currentTransaction, Object row) throws IOException {