Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5039

Avro GenericRecord support is broken

    Details

      Description

      Avro GenericRecord support was introduced in FLINK-3691, but it seems like the GenericRecords are not properly (de)serialized.

      This can be easily seen with a program like this:

        env.createInput(new AvroInputFormat<>(new Path("somefile.avro"), GenericRecord.class))
          .first(10)
          .print();
      

      which will print records in which all fields have the same value:

      {"foo": 1478628723066, "bar": 1478628723066, "baz": 1478628723066, ...}
      {"foo": 1478628723179, "bar": 1478628723179, "baz": 1478628723179, ...}
      

      If I'm not mistaken, the AvroInputFormat does essentially TypeExtractor.getForClass(GenericRecord.class), but GenericRecords are not POJOs.

      Furthermore, each GenericRecord contains a pointer to the record schema. I guess the current naive approach will serialize this schema with each record, which is quite inefficient (the schema is typically more complex and much larger than the data). We probably need a TypeInformation and TypeSerializer specific to Avro GenericRecords, which could just use avro serialization.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.1.4 with 3ae6e9e09ba74f88fe87d1ac130b3cc232a5e88c
          Fixed for 1.2.0 with 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.1.4 with 3ae6e9e09ba74f88fe87d1ac130b3cc232a5e88c Fixed for 1.2.0 with 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2953

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2953
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2953

          +1
          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2953 +1 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2953

          +1 to merge this for 1.2 and 1.1.4

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2953 +1 to merge this for 1.2 and 1.1.4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davetorok commented on the issue:

          https://github.com/apache/flink/pull/2953

          Thanks for this! I was going to PR yesterday but you beat me to it. Successfully running 25,825,230 Avro / GenericRecords in the past 48 hrs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user davetorok commented on the issue: https://github.com/apache/flink/pull/2953 Thanks for this! I was going to PR yesterday but you beat me to it. Successfully running 25,825,230 Avro / GenericRecords in the past 48 hrs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2953

          Rebased to master to include fixed build.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2953 Rebased to master to include fixed build.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rmetzger opened a pull request:

          https://github.com/apache/flink/pull/2953

          FLINK-5039 Bump Avro version

          This is a critical issue for some of our users.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/rmetzger/flink flink5039

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2953.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2953


          commit b3da788361b5ffe72edb033d663a58433a695c73
          Author: Robert Metzger <rmetzger@apache.org>
          Date: 2016-12-06T20:03:10Z

          FLINK-5039 Bump Avro version


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2953 FLINK-5039 Bump Avro version This is a critical issue for some of our users. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink5039 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2953.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2953 commit b3da788361b5ffe72edb033d663a58433a695c73 Author: Robert Metzger <rmetzger@apache.org> Date: 2016-12-06T20:03:10Z FLINK-5039 Bump Avro version
          Hide
          rmetzger Robert Metzger added a comment -

          Just using the Avro serializer doesn't solve the problem here. We also need to include the schema into the TypeInformation so that the can initialize the GenericRecordSerializer with it.
          This also means that users can not use GenericRecords with different schemas in the same stream.

          Doing a minor avro version bump was meant as a temporary fix to make it at least work correctly, before we make it fast I believe a Avro dependency upgrade won't destroy old savepoints because avro serialization is determined purely by the schema.

          Show
          rmetzger Robert Metzger added a comment - Just using the Avro serializer doesn't solve the problem here. We also need to include the schema into the TypeInformation so that the can initialize the GenericRecordSerializer with it. This also means that users can not use GenericRecords with different schemas in the same stream. Doing a minor avro version bump was meant as a temporary fix to make it at least work correctly, before we make it fast I believe a Avro dependency upgrade won't destroy old savepoints because avro serialization is determined purely by the schema.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Bruno Dumon True, we need to change it such that the Avro Serializer is used for Avro Types.

          The tricky part is that we cannot simply swap serializers, since that breaks the savepoint compatibility.
          We are introducing versioning information into the savepoints in the next release. That allows us to add a "state upgrade" path in which we can swap serializers without breaking safepoints.

          Show
          StephanEwen Stephan Ewen added a comment - Bruno Dumon True, we need to change it such that the Avro Serializer is used for Avro Types. The tricky part is that we cannot simply swap serializers, since that breaks the savepoint compatibility. We are introducing versioning information into the savepoints in the next release. That allows us to add a "state upgrade" path in which we can swap serializers without breaking safepoints.
          Hide
          bruno Bruno Dumon added a comment -

          Looks like a suboptimal solution to me, given that the schema will still be serialized in each instance record.

          Show
          bruno Bruno Dumon added a comment - Looks like a suboptimal solution to me, given that the schema will still be serialized in each instance record.
          Hide
          rmetzger Robert Metzger added a comment -

          Thanks a lot for looking into the issue. I think we can upgrade the Avro version to 1.7.7 for Flink 1.1.4 and 1.2.0.
          I'll open a PR.

          Show
          rmetzger Robert Metzger added a comment - Thanks a lot for looking into the issue. I think we can upgrade the Avro version to 1.7.7 for Flink 1.1.4 and 1.2.0. I'll open a PR.
          Hide
          dtorok Dave Torok added a comment -

          I have spent 2 days on this and HAVE THE SOLUTION.

          Fix: Bump Avro version to at least 1.7.7 from the current 1.7.6.

          Root Cause. Within "Schema.class" the "field" position is a TRANSIENT and does not get serialized by Kryo!

          See https://issues.apache.org/jira/browse/AVRO-1476 and specififcally mentions kyro

          This was fixed in 1.7.7

          This is also the cause for other GenericRecord issues such as the 'union' issue mentioned here http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response

          PLEASE BUMP THIS ASAP!!!!

          I have verified the fix in my local machine by replacing the Avro classes within the flink-dist_2.11-1.1.3.jar and it corrected my issue.

          Show
          dtorok Dave Torok added a comment - I have spent 2 days on this and HAVE THE SOLUTION. Fix: Bump Avro version to at least 1.7.7 from the current 1.7.6. Root Cause. Within "Schema.class" the "field" position is a TRANSIENT and does not get serialized by Kryo! See https://issues.apache.org/jira/browse/AVRO-1476 and specififcally mentions kyro This was fixed in 1.7.7 This is also the cause for other GenericRecord issues such as the 'union' issue mentioned here http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response PLEASE BUMP THIS ASAP!!!! I have verified the fix in my local machine by replacing the Avro classes within the flink-dist_2.11-1.1.3.jar and it corrected my issue.

            People

            • Assignee:
              rmetzger Robert Metzger
              Reporter:
              bruno Bruno Dumon
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development