Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21406

Add AvroParquetFileRecordFormat

    XMLWordPrintableJSON

    Details

      Description

      There is currently no easy way to read avro GenericRecords from parquet via the new FileSource.
      While helping out a user I started writing FileRecordFormat that could do that, but it requires some finalization.

      The implementation is similar to our ParquetAvroWriters class, in that we just wrap some parquet classes and bridge our FileSystem with their IO abstraction.

      The main goal was to have a format that reads data through our FileSystems, and not work directly against Hadoop to prevent a ClassLoader leak from the S3AFileSystem (threads in a thread pool can keep references to the user classloader).

      According to the user it appears to be working, but it will need some cleanup, ideally support for specific records, support for checkpointing (which should be fairly easy I believe), maybe splitting files (not sure whether this works properly with Parquet) and of course tests + documentation.

      public class ParquetAvroFileRecordFormat implements FileRecordFormat<GenericRecord> {
          private final transient Schema schema;
      
          public ParquetAvroFileRecordFormat(Schema schema) {
              this.schema = schema;
          }
      
          @Override
          public Reader<GenericRecord> createReader(
                  Configuration config, Path filePath, long splitOffset, long splitLength)
                  throws IOException {
      
              final FileSystem fs = filePath.getFileSystem();
              final FileStatus status = fs.getFileStatus(filePath);
              final FSDataInputStream in = fs.open(filePath);
      
              return new MyReader(
                      AvroParquetReader.<GenericRecord>builder(new InputFileWrapper(in, status.getLen()))
                              .withDataModel(GenericData.get())
                              .build());
          }
      
          @Override
          public Reader<GenericRecord> restoreReader(
                  Configuration config,
                  Path filePath,
                  long restoredOffset,
                  long splitOffset,
                  long splitLength) {
              // not called if checkpointing isn't used
              return null;
          }
      
          @Override
          public boolean isSplittable() {
              // let's not worry about this for now
              return false;
          }
      
          @Override
          public TypeInformation<GenericRecord> getProducedType() {
              return new GenericRecordAvroTypeInfo(schema);
          }
      
          private static class MyReader implements FileRecordFormat.Reader<GenericRecord> {
      
              private final ParquetReader<GenericRecord> parquetReader;
      
              private MyReader(ParquetReader<GenericRecord> parquetReader) {
                  this.parquetReader = parquetReader;
              }
      
              @Nullable
              @Override
              public GenericRecord read() throws IOException {
                  return parquetReader.read();
              }
      
              @Override
              public void close() throws IOException {
                  parquetReader.close();
              }
          }
      
          private static class InputFileWrapper implements InputFile {
      
              private final FSDataInputStream inputStream;
              private final long length;
      
              private InputFileWrapper(FSDataInputStream inputStream, long length) {
                  this.inputStream = inputStream;
                  this.length = length;
              }
      
              @Override
              public long getLength() {
                  return length;
              }
      
              @Override
              public SeekableInputStream newStream() {
                  return new SeekableInputStreamAdapter(inputStream);
              }
          }
      
          private static class SeekableInputStreamAdapter extends DelegatingSeekableInputStream {
      
              private final FSDataInputStream inputStream;
      
              private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
                  super(inputStream);
                  this.inputStream = inputStream;
              }
      
              @Override
              public long getPos() throws IOException {
                  return inputStream.getPos();
              }
      
              @Override
              public void seek(long newPos) throws IOException {
                  inputStream.seek(newPos);
              }
          }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              jingge Jing Ge
              Reporter:
              chesnay Chesnay Schepler
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: