diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2604d5d..ed7e6b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -61,7 +62,6 @@ import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -75,6 +75,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; @@ -292,7 +295,7 @@ public Stat getStat() { private transient SubStructObjectInspector subSetOI; private transient int timeOut; // JT timeout in msec. private transient long lastProgressReport = System.currentTimeMillis(); - + private transient ObjectInspector destObjectInspector; protected transient boolean autoDelete = false; protected transient JobConf jc; Class outputClass; @@ -438,6 +441,23 @@ private void initializeSpecPath() { suffix = suffix + "_" + fullName.toLowerCase(); } + final String columnNameProperty = conf.getTableInfo().getProperties().getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = conf.getTableInfo().getProperties().getProperty(IOConstants.COLUMNS_TYPES); + List columnNames; + List columnTypes; + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + TypeInfo structTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + destObjectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(structTypeInfo); statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count); } catch (HiveException e) { throw e; @@ -717,7 +737,7 @@ public void process(Object row, int tag) throws HiveException { fpaths = fsp; } // use SerDe to serialize r, and write it out - recordValue = serializer.serialize(row, inputObjInspectors[0]); + recordValue = serializer.serialize(row, destObjectInspector); } rowOutWriters = fpaths.outWriters;