Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5039

Avro GenericRecord support is broken

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.1.3
    • 1.1.4, 1.2.0
    • API / DataSet
    • None

    Description

      Avro GenericRecord support was introduced in FLINK-3691, but it seems like the GenericRecords are not properly (de)serialized.

      This can be easily seen with a program like this:

        env.createInput(new AvroInputFormat<>(new Path("somefile.avro"), GenericRecord.class))
          .first(10)
          .print();
      

      which will print records in which all fields have the same value:

      {"foo": 1478628723066, "bar": 1478628723066, "baz": 1478628723066, ...}
      {"foo": 1478628723179, "bar": 1478628723179, "baz": 1478628723179, ...}
      

      If I'm not mistaken, the AvroInputFormat does essentially TypeExtractor.getForClass(GenericRecord.class), but GenericRecords are not POJOs.

      Furthermore, each GenericRecord contains a pointer to the record schema. I guess the current naive approach will serialize this schema with each record, which is quite inefficient (the schema is typically more complex and much larger than the data). We probably need a TypeInformation and TypeSerializer specific to Avro GenericRecords, which could just use avro serialization.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            rmetzger Robert Metzger
            bruno Bruno Dumon
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment