Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.3.2, 2.4.0
-
None
-
None
Description
Currently, when we are using insert into hive table related command.
The parquet file generated will always be version 1.6,reason is below:
1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( jobConf, tableDesc, serializer.getSerializedClass, fileSinkConf, new Path(path), Reporter.NULL)
2. we will call
public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class<? extends Writable> outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); try { boolean isCompressed = conf.getCompressed(); JobConf jc_output = jc; if (isCompressed) { jc_output = new JobConf(jc); String codecStr = conf.getCompressCodec(); if (codecStr != null && !codecStr.trim().equals("")) { Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) JavaUtils.loadClass(codecStr); FileOutputFormat.setOutputCompressorClass(jc_output, codec); } String type = conf.getCompressType(); if (type != null && !type.trim().equals("")) { CompressionType style = CompressionType.valueOf(type); SequenceFileOutputFormat.setOutputCompressionType(jc, style); } } return getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath, reporter); } catch (Exception e) { throw new HiveException(e); } } public static RecordWriter getRecordWriter(JobConf jc, OutputFormat<?, ?> outputFormat, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProp, Path outPath, Reporter reporter ) throws IOException, HiveException { if (!(outputFormat instanceof HiveOutputFormat)) { outputFormat = new HivePassThroughOutputFormat(outputFormat); } return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( jc, outPath, valueClass, isCompressed, tableProp, reporter); }
3. then in MapredParquetOutPutFormat
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( final JobConf jobConf, final Path finalOutPath, final Class<? extends Writable> valueClass, final boolean isCompressed, final Properties tableProperties, final Progressable progress) throws IOException { LOG.info("creating new record writer..." + this); final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); List<String> columnNames; List<TypeInfo> columnTypes; if (columnNameProperty.length() == 0) { columnNames = new ArrayList<String>(); } else { columnNames = Arrays.asList(columnNameProperty.split(",")); } if (columnTypeProperty.length() == 0) { columnTypes = new ArrayList<TypeInfo>(); } else { columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); }
4. then call
public ParquetRecordWriterWrapper( final OutputFormat<Void, ParquetHiveRecord> realOutputFormat, final JobConf jobConf, final String name, final Progressable progress, Properties tableProperties) throws IOException { try { // create a TaskInputOutputContext TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id")); if (taskAttemptID == null) { taskAttemptID = new TaskAttemptID(); } taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); LOG.info("initialize serde with table properties."); initializeSerProperties(taskContext, tableProperties); LOG.info("creating real writer to write at " + name); realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { throw new IOException(e); } }
And the ((ParquetOutputFormat) is verison 1.6.
And all file generated will miss some useful Statistics like min max of string.
We should fix this issue to use new features of parquet
Attachments
Issue Links
- duplicates
-
SPARK-27176 Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
- Resolved