Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26703

Hive record writer will always depends on parquet-1.6 writer should fix it

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.3.2, 2.4.0
    • None
    • SQL
    • None

    Description

      Currently, when we are using insert into hive table related command.
      The parquet file generated will always be version 1.6,reason is below:
      1. we rely on hive-exec HiveFileFormatUtils to get recordWriter

      private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
          jobConf,
          tableDesc,
          serializer.getSerializedClass,
          fileSinkConf,
          new Path(path),
          Reporter.NULL)
      

      2. we will call

      public static RecordWriter getHiveRecordWriter(JobConf jc,
            TableDesc tableInfo, Class<? extends Writable> outputClass,
            FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
          HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
          try {
            boolean isCompressed = conf.getCompressed();
            JobConf jc_output = jc;
            if (isCompressed) {
              jc_output = new JobConf(jc);
              String codecStr = conf.getCompressCodec();
              if (codecStr != null && !codecStr.trim().equals("")) {
                Class<? extends CompressionCodec> codec = 
                    (Class<? extends CompressionCodec>) JavaUtils.loadClass(codecStr);
                FileOutputFormat.setOutputCompressorClass(jc_output, codec);
              }
              String type = conf.getCompressType();
              if (type != null && !type.trim().equals("")) {
                CompressionType style = CompressionType.valueOf(type);
                SequenceFileOutputFormat.setOutputCompressionType(jc, style);
              }
            }
            return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
                isCompressed, tableInfo.getProperties(), outPath, reporter);
          } catch (Exception e) {
            throw new HiveException(e);
          }
        }
      
        public static RecordWriter getRecordWriter(JobConf jc,
            OutputFormat<?, ?> outputFormat,
            Class<? extends Writable> valueClass, boolean isCompressed,
            Properties tableProp, Path outPath, Reporter reporter
            ) throws IOException, HiveException {
          if (!(outputFormat instanceof HiveOutputFormat)) {
            outputFormat = new HivePassThroughOutputFormat(outputFormat);
          }
          return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
              jc, outPath, valueClass, isCompressed, tableProp, reporter);
        }
      

      3. then in MapredParquetOutPutFormat

      public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
            final JobConf jobConf,
            final Path finalOutPath,
            final Class<? extends Writable> valueClass,
            final boolean isCompressed,
            final Properties tableProperties,
            final Progressable progress) throws IOException {
      
          LOG.info("creating new record writer..." + this);
      
          final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
          final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
          List<String> columnNames;
          List<TypeInfo> columnTypes;
      
          if (columnNameProperty.length() == 0) {
            columnNames = new ArrayList<String>();
          } else {
            columnNames = Arrays.asList(columnNameProperty.split(","));
          }
      
          if (columnTypeProperty.length() == 0) {
            columnTypes = new ArrayList<TypeInfo>();
          } else {
            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
          }
      
          DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
      
          return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
                  progress,tableProperties);
        }
      

      4. then call

      public ParquetRecordWriterWrapper(
            final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
            final JobConf jobConf,
            final String name,
            final Progressable progress, Properties tableProperties) throws
                IOException {
          try {
            // create a TaskInputOutputContext
            TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
            if (taskAttemptID == null) {
              taskAttemptID = new TaskAttemptID();
            }
            taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
      
            LOG.info("initialize serde with table properties.");
            initializeSerProperties(taskContext, tableProperties);
      
            LOG.info("creating real writer to write at " + name);
      
            realWriter =
                    ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
      
            LOG.info("real writer: " + realWriter);
          } catch (final InterruptedException e) {
            throw new IOException(e);
          }
        }
      

      And the ((ParquetOutputFormat) is verison 1.6.
      And all file generated will miss some useful Statistics like min max of string.
      We should fix this issue to use new features of parquet

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              cane zhoukang
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: