diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 5fcc367cc9..89a2576ba8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7351,7 +7351,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { destinationTable.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(partSpec, + dpCtx = new DynamicPartitionCtx(partSpec, destinationTable.getPartitionKeys(), conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -7675,6 +7675,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) cols = ct.cols; colTypes = ct.colTypes; dpCtx = new DynamicPartitionCtx(partitionColumnNames, + destinationTable != null ? destinationTable.getPartitionKeys() : null, conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -8313,7 +8314,7 @@ private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table dest_tab, DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { dest_tab.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(partSpec, + dpCtx = new DynamicPartitionCtx(partSpec, parts, conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -8456,18 +8457,50 @@ private Operator genConversionSelectOperator(String dest, QB qb, Operator input, } expressions.add(column); } - } - // deal with dynamic partition columns: convert ExprNodeDesc type to String?? - if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) { - // DP columns starts with tableFields.size() - for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) { - TypeInfo rowFieldTypeInfo = rowFields.get(i).getType(); - ExprNodeDesc column = new ExprNodeColumnDesc( - rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true); - expressions.add(column); + // deal with dynamic partition columns + if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) { + // rowFields contains non-partitioned columns (tableFields) followed by DP columns + int rowFieldsOffset = tableFields.size() + (updating(dest) ? 1 : 0); + for (int dpColIdx = 0; dpColIdx < rowFields.size() - rowFieldsOffset; ++dpColIdx) { + + // create ExprNodeDesc + ColumnInfo inputColumn = rowFields.get(dpColIdx + rowFieldsOffset); + TypeInfo inputTypeInfo = inputColumn.getType(); + ExprNodeDesc column = + new ExprNodeColumnDesc(inputTypeInfo, inputColumn.getInternalName(), "", true); + + // Cast input column to destination column type if necessary. + if (dpCtx.getDpFieldSchemas() != null && dpCtx.getDpFieldSchemas().size() > 0) { + String destPartitionName = dpCtx.getDPColNames().get(dpColIdx); + FieldSchema destPartitionFieldSchema = dpCtx.getDpFieldSchemas().get(destPartitionName); + if (destPartitionFieldSchema != null) { + String partitionType = destPartitionFieldSchema.getType(); + if (partitionType != null) { + PrimitiveTypeInfo partitionTypeInfo = + TypeInfoFactory.getPrimitiveTypeInfo(partitionType); + + if (!partitionTypeInfo.equals(inputTypeInfo)) { + column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor() + .createConversionCast(column, partitionTypeInfo); + converted = true; + } + } else { + LOG.warn( + "Couldn't get FieldSchema for partition" + destPartitionFieldSchema.getName()); + } + } else { + LOG.warn("Partition schema for dynamic partition " + destPartitionName + + " not found in DynamicPartitionCtx."); + } + } else { + LOG.info("Partition schema for dynamic partition " + inputColumn.getAlias() + " (" + + inputColumn.getInternalName() + + ") not found in DynamicPartitionCtx. This is expected with a CTAS."); + } + expressions.add(column); + } } - // converted = true; // [TODO]: should we check & convert type to String and set it to true? } if (converted) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index c1aeb8f136..d50aab7331 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,6 +19,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -39,6 +41,7 @@ private static final long serialVersionUID = 1L; private Map partSpec; // partSpec is an ORDERED hash map + private Map dpFieldSchemas; // name and FieldSchema of dynamic partitions private int numDPCols; // number of dynamic partition columns private int numSPCols; // number of static partition columns private String spPath; // path name corresponding to SP columns @@ -59,8 +62,8 @@ public DynamicPartitionCtx() { * partitioned columns, which will all be dynamic partitions since the binding * is done after executing the query in the CTAS. */ - public DynamicPartitionCtx(List partColNames, String defaultPartName, - int maxParts) throws SemanticException { + public DynamicPartitionCtx(List partColNames, List dpFieldSchemas, + String defaultPartName, int maxParts) throws SemanticException { this.partSpec = new LinkedHashMap<>(); this.spNames = new ArrayList<>(); this.dpNames = new ArrayList<>(); @@ -82,10 +85,12 @@ public DynamicPartitionCtx(List partColNames, String defaultPartName, throw new SemanticException(e); } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); + populateDpFieldSchemas(dpFieldSchemas); } - public DynamicPartitionCtx(Map partSpec, String defaultPartName, - int maxParts) throws SemanticException { + + public DynamicPartitionCtx(Map partSpec, List dpFieldSchemas, + String defaultPartName, int maxParts) throws SemanticException { this.partSpec = partSpec; this.spNames = new ArrayList(); this.dpNames = new ArrayList(); @@ -114,6 +119,7 @@ public DynamicPartitionCtx(Map partSpec, String defaultPartName, throw new SemanticException(e); } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); + populateDpFieldSchemas(dpFieldSchemas); } public DynamicPartitionCtx(DynamicPartitionCtx dp) { @@ -213,4 +219,17 @@ public void setSPPath(String sp) { public String getSPPath() { return this.spPath; } + + private void populateDpFieldSchemas(List dynamicPartitionColumns) { + dpFieldSchemas = new HashMap<>(); + if (dynamicPartitionColumns != null) { + dynamicPartitionColumns.forEach(dynamicPartitionColumn -> { + dpFieldSchemas.put(dynamicPartitionColumn.getName(), dynamicPartitionColumn); + }); + } + } + + public Map getDpFieldSchemas() { + return dpFieldSchemas; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 2c4b69b2fe..897fb2b6e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -283,7 +283,8 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true)); Map partColMap= new LinkedHashMap(1); partColMap.put(PARTCOL_NAME, null); - DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); + DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, + null, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);