diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f5df9cd302..3dac21ec08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7353,7 +7353,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { destinationTable.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(partSpec, + dpCtx = new DynamicPartitionCtx(partSpec, destinationTable.getPartitionKeys(), conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -7676,7 +7676,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) sortColumns, distributeColumns, fileSinkColInfos, sortColInfos, distributeColInfos); cols = ct.cols; colTypes = ct.colTypes; - dpCtx = new DynamicPartitionCtx(partitionColumnNames, + dpCtx = new DynamicPartitionCtx(partitionColumnNames, destinationTable.getPartitionKeys(), conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -8315,7 +8315,7 @@ private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table dest_tab, DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { dest_tab.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(partSpec, + dpCtx = new DynamicPartitionCtx(partSpec, dest_tab.getPartitionKeys(), conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -8460,16 +8460,36 @@ private Operator genConversionSelectOperator(String dest, QB qb, Operator input, } } - // deal with dynamic partition columns: convert ExprNodeDesc type to String?? + // deal with dynamic partition columns if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) { - // DP columns starts with tableFields.size() - for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) { - TypeInfo rowFieldTypeInfo = rowFields.get(i).getType(); - ExprNodeDesc column = new ExprNodeColumnDesc( - rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true); + // rowFields contains non-partitioned columns (tableFields) followed by DP columns + int rowFieldsOffset = tableFields.size() + (updating(dest) ? 1 : 0); + for (int dpColIdx = 0; dpColIdx < rowFields.size() - rowFieldsOffset; ++dpColIdx) { + + // get TypeInfo of the destination partition + String destPartitionName = dpCtx.getDPColNames().get(dpColIdx); + FieldSchema destPartitionFieldSchema = dpCtx.getDpFieldSchemas().get(destPartitionName); + if (destPartitionFieldSchema == null) { + throw new IllegalStateException("Partition schema for dynamic partition " + destPartitionName + " not found in DynamicPartitionCtx."); + } + String partitionType = destPartitionFieldSchema.getType(); + if (partitionType == null) { + throw new IllegalStateException( + "Couldn't get FieldSchema for partition" + destPartitionFieldSchema.getName()); + } + PrimitiveTypeInfo partitionTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(partitionType); + + // create ExprNodeDesc, and cast if input and destination types don't match + ColumnInfo inputColumn = rowFields.get(dpColIdx + rowFieldsOffset); + TypeInfo inputTypeInfo = inputColumn.getType(); + ExprNodeDesc column = + new ExprNodeColumnDesc(inputTypeInfo, inputColumn.getInternalName(), "", true); + if (!partitionTypeInfo.equals(inputTypeInfo)) { + column = ParseUtils.createConversionCast(column, partitionTypeInfo); + converted = true; + } expressions.add(column); } - // converted = true; // [TODO]: should we check & convert type to String and set it to true? } if (converted) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index c1aeb8f136..6e8fe1a771 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,6 +19,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -39,6 +41,7 @@ private static final long serialVersionUID = 1L; private Map partSpec; // partSpec is an ORDERED hash map + private Map dpFieldSchemas; // name and FieldSchema of dynamic partitions private int numDPCols; // number of dynamic partition columns private int numSPCols; // number of static partition columns private String spPath; // path name corresponding to SP columns @@ -59,8 +62,8 @@ public DynamicPartitionCtx() { * partitioned columns, which will all be dynamic partitions since the binding * is done after executing the query in the CTAS. */ - public DynamicPartitionCtx(List partColNames, String defaultPartName, - int maxParts) throws SemanticException { + public DynamicPartitionCtx(List partColNames, List dpFieldSchemas, + String defaultPartName, int maxParts) throws SemanticException { this.partSpec = new LinkedHashMap<>(); this.spNames = new ArrayList<>(); this.dpNames = new ArrayList<>(); @@ -82,10 +85,12 @@ public DynamicPartitionCtx(List partColNames, String defaultPartName, throw new SemanticException(e); } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); + populateDpFieldSchemas(dpFieldSchemas); } - public DynamicPartitionCtx(Map partSpec, String defaultPartName, - int maxParts) throws SemanticException { + + public DynamicPartitionCtx(Map partSpec, List dpFieldSchemas, + String defaultPartName, int maxParts) throws SemanticException { this.partSpec = partSpec; this.spNames = new ArrayList(); this.dpNames = new ArrayList(); @@ -114,6 +119,7 @@ public DynamicPartitionCtx(Map partSpec, String defaultPartName, throw new SemanticException(e); } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); + populateDpFieldSchemas(dpFieldSchemas); } public DynamicPartitionCtx(DynamicPartitionCtx dp) { @@ -213,4 +219,17 @@ public void setSPPath(String sp) { public String getSPPath() { return this.spPath; } + + private void populateDpFieldSchemas(List dynamicPartitionColumns) { + dpFieldSchemas = new HashMap<>(); + if (dynamicPartitionColumns != null) { + dynamicPartitionColumns.forEach(dynamicPartitionColumn -> { + dpFieldSchemas.put(dynamicPartitionColumn.getName(), dynamicPartitionColumn); + }); + } + } + + public Map getDpFieldSchemas() { + return dpFieldSchemas; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 2c4b69b2fe..897fb2b6e0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -283,7 +283,8 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true)); Map partColMap= new LinkedHashMap(1); partColMap.put(PARTCOL_NAME, null); - DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); + DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, + null, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);