Details
-
Improvement
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.1.3, 3.2.4, 3.3.4, 3.4.3, 3.5.3
-
None
-
None
Description
when saving big Avro file with spark, it may be extremely slow because of the "flush()" that are called many times, for each avro block ~64k of bytes.
This is especially slow on a an Hadoop VirtualFileSystem for which the flush() is really doing many things internally, in addition to reconnecting a Https connection (example: Azure Storage).
Currently, the code that implements the Avro format support is from hive-exec jar.
Here is the corresponding Jira ticket in Apache Hive: https://issues.apache.org/jira/browse/HIVE-28628
At the lowest level, it is doing
public class AvroContainerOutputFormat { public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class<? extends Writable> valueClass, boolean isCompressed, Properties properties, Progressable progressable) throws IOException { Schema schema; try { schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, properties); } catch (AvroSerdeException var13) { AvroSerdeException e = var13; throw new IOException(e); } GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema); DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw); if (isCompressed) { int level = jobConf.getInt("avro.mapred.deflate.level", -1); String codecName = jobConf.get("avro.output.codec", "deflate"); CodecFactory factory = codecName.equals("deflate") ? CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName); dfw.setCodec(factory); } dfw.create(schema, path.getFileSystem(jobConf).create(path)); return new AvroGenericRecordWriter(dfw); }
As you can see only 2 options are used from the spark->hadoop jobConf -> hive-exec to the Avro library : the schema, and the codec compression.
The Avro class "DataFileWriter" supports 2 other importants attributes, but the default value are suited only for doing small files (or kafka streaming):
public class DataFileWriter<D> implements Closeable, Flushable { private int syncInterval = 64000; private boolean flushOnEveryBlock = true;
The proposed changed is to override (upgrade?) the class in hive-exec so that these 2 attributes could be configured with better values.
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw); Boolean flushOnEveryBlock = …. Default true … may change to false from optional conf dfw.setFlushOnEveryBlock(false); int syncInterval = 8388608; // 8mo … default 64000, and max allowed 1073741824=1go … may change from optional conf dfw.setFlushOnEveryBlock(syncInterval);