Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-50326

override parameter to Avro DataFileWriter to avoid to many flush, 1 per block of 64k

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.1.3, 3.2.4, 3.3.4, 3.4.3, 3.5.3
    • None
    • Input/Output
    • None

    Description

      when saving big Avro file with spark, it may be extremely slow because of the "flush()" that are called many times, for each avro block ~64k of bytes.

      This is especially slow on a an Hadoop VirtualFileSystem for which the flush() is really doing many things internally, in addition to reconnecting a Https connection (example: Azure Storage).

      Currently, the code that implements the Avro format support is from hive-exec jar.

      Here is the corresponding Jira ticket in Apache Hive: https://issues.apache.org/jira/browse/HIVE-28628

      At the lowest level, it is doing

      https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71

      public class AvroContainerOutputFormat {
      
          public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class<? extends Writable> valueClass, boolean isCompressed, Properties properties, Progressable progressable) throws IOException {
              Schema schema;
              try {
                  schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, properties);
              } catch (AvroSerdeException var13) {
                  AvroSerdeException e = var13;
                  throw new IOException(e);
              }
      
              GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema);
              DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
              if (isCompressed) {
                  int level = jobConf.getInt("avro.mapred.deflate.level", -1);
                  String codecName = jobConf.get("avro.output.codec", "deflate");
                  CodecFactory factory = codecName.equals("deflate") ? CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
                  dfw.setCodec(factory);
              }
      
              dfw.create(schema, path.getFileSystem(jobConf).create(path));
              return new AvroGenericRecordWriter(dfw);
          }
      

      As you can see only 2 options are used from the spark->hadoop jobConf -> hive-exec to the Avro library : the schema, and the codec compression.

      The Avro class "DataFileWriter" supports 2 other importants attributes, but the default value are suited only for doing small files (or kafka streaming):

      public class DataFileWriter<D> implements Closeable, Flushable {
          private int syncInterval = 64000;
          private boolean flushOnEveryBlock = true;
      

      The proposed changed is to override (upgrade?) the class in hive-exec so that these 2 attributes could be configured with better values.

      DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
      
      Boolean flushOnEveryBlock = …. Default true … may change to false from optional conf
                     dfw.setFlushOnEveryBlock(false);
      
      int syncInterval =  8388608; // 8mo  …  default 64000, and max allowed 1073741824=1go  … may change from optional conf
                     dfw.setFlushOnEveryBlock(syncInterval);
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            arnaud.nauwynck Arnaud Nauwynck
            Votes:
            2 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: