Flume
  1. Flume
  2. FLUME-2010

Support Avro records in Log4jAppender and the HDFS Sink

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: v1.4.0
    • Component/s: Client SDK, Sinks+Sources
    • Labels:
      None

      Description

      It would be nice to support logging arbitrary Avro records via the Log4j Flume logger, and have them written to HDFS in Avro data files (using an appropriately configured HDFS sink).

      1. FLUME-2010.patch
        37 kB
        Tom White
      2. FLUME-2010.patch
        37 kB
        Tom White
      3. FLUME-2010.patch
        28 kB
        Tom White
      4. FLUME-2010.patch
        11 kB
        Tom White
      5. FLUME-2010.patch
        8 kB
        Tom White

        Issue Links

          Activity

          Hide
          Hudson added a comment -

          Integrated in flume-trunk #428 (See https://builds.apache.org/job/flume-trunk/428/)
          FLUME-2010. Support Avro records in Log4jAppender and the HDFS Sink. (Revision e048160e043e4aa57fc49cc33f2640e60677cbe5)

          Result = UNSTABLE
          mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=e048160e043e4aa57fc49cc33f2640e60677cbe5
          Files :

          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc
          • flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
          • flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
          • flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties
          • flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties
          • flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
          • flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
          Show
          Hudson added a comment - Integrated in flume-trunk #428 (See https://builds.apache.org/job/flume-trunk/428/ ) FLUME-2010 . Support Avro records in Log4jAppender and the HDFS Sink. (Revision e048160e043e4aa57fc49cc33f2640e60677cbe5) Result = UNSTABLE mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=e048160e043e4aa57fc49cc33f2640e60677cbe5 Files : flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
          Hide
          Mike Percy added a comment -

          Thanks for taking a look @ FLUME-2048 and filing FLUME-2091, Tom. Agree we need some more discussion around the schema hash stuff. I'll add literal schema support to FLUME-2048 for now and file a follow up JIRA regarding the URL approach.

          I just pushed this to trunk and flume-1.4 branches. Thanks for the patch!

          Show
          Mike Percy added a comment - Thanks for taking a look @ FLUME-2048 and filing FLUME-2091 , Tom. Agree we need some more discussion around the schema hash stuff. I'll add literal schema support to FLUME-2048 for now and file a follow up JIRA regarding the URL approach. I just pushed this to trunk and flume-1.4 branches. Thanks for the patch!
          Hide
          Mike Percy added a comment -

          +1

          Show
          Mike Percy added a comment - +1
          Hide
          Tom White added a comment -

          Mike, thanks for taking a look. I think adding schema hashes needs a bit more work and discussion, so I've opened FLUME-2091 for that.

          FLUME-2048 looks like a good approach. To work well with this JIRA it would be good to add support for the headers for schema URL or literal from the latest patch here. That could be done as a follow-up JIRA of course.

          Show
          Tom White added a comment - Mike, thanks for taking a look. I think adding schema hashes needs a bit more work and discussion, so I've opened FLUME-2091 for that. FLUME-2048 looks like a good approach. To work well with this JIRA it would be good to add support for the headers for schema URL or literal from the latest patch here. That could be done as a follow-up JIRA of course.
          Hide
          Mike Percy added a comment -

          Either way, +1 on this patch

          Show
          Mike Percy added a comment - Either way, +1 on this patch
          Hide
          Mike Percy added a comment -

          Tom, this looks great. I'd like to also support sending Avro schema hashes similar to how you originally imagined them, possibly using some local-FS schema repository. It doesn't necessarily have to be part of this JIRA but if it's straightforward then why not?

          If you have time, please take a look @ FLUME-2048 for an example deserializer that tags each even with a schema hash. What are your thoughts on that approach?

          Show
          Mike Percy added a comment - Tom, this looks great. I'd like to also support sending Avro schema hashes similar to how you originally imagined them, possibly using some local-FS schema repository. It doesn't necessarily have to be part of this JIRA but if it's straightforward then why not? If you have time, please take a look @ FLUME-2048 for an example deserializer that tags each even with a schema hash. What are your thoughts on that approach?
          Hide
          Tom White added a comment -

          Slightly updated version that uses DataFileWriter#appendEncoded to avoid an extra deserialization/serialization step.

          Show
          Tom White added a comment - Slightly updated version that uses DataFileWriter#appendEncoded to avoid an extra deserialization/serialization step.
          Hide
          Tom White added a comment -

          Here's a new patch which I've been using successfully. Sorry it took so long for me to update this.

          Changes in this version of the patch:

          • Removed the schema repository code. Instead you can optionally specify a URL from which the Avro schema can be retrieved (e.g. from HDFS or a web server). This is similar to how Avro schemas in Hive are handled.
          • Added support for all types of Avro serialization (generic, specific, reflect).
          • Moved AvroEventSerializer to the HDFS sink module since it has a Hadoop dependency (so it can retrieve Avro schemas from HDFS).
          • Added tests and documentation.
          Show
          Tom White added a comment - Here's a new patch which I've been using successfully. Sorry it took so long for me to update this. Changes in this version of the patch: Removed the schema repository code. Instead you can optionally specify a URL from which the Avro schema can be retrieved (e.g. from HDFS or a web server). This is similar to how Avro schemas in Hive are handled. Added support for all types of Avro serialization (generic, specific, reflect). Moved AvroEventSerializer to the HDFS sink module since it has a Hadoop dependency (so it can retrieve Avro schemas from HDFS). Added tests and documentation.
          Hide
          Tom White added a comment -

          Thanks for the review Mike.

          Couldn't we just store the schemas (one per line) and auto-generate the 64-bit hash? Why bother storing that?

          Yes, that would work, but I kept the ID/schema mapping because that is how AVRO-1124 does things, and I'd like to make it as easy as possible to migrate to that scheme. There is nothing in the code that assumes the IDs are 64-bit hashes - I just used that in the example on github.

          I'll update the patch with more unit tests and documentation.

          Show
          Tom White added a comment - Thanks for the review Mike. Couldn't we just store the schemas (one per line) and auto-generate the 64-bit hash? Why bother storing that? Yes, that would work, but I kept the ID/schema mapping because that is how AVRO-1124 does things, and I'd like to make it as easy as possible to migrate to that scheme. There is nothing in the code that assumes the IDs are 64-bit hashes - I just used that in the example on github. I'll update the patch with more unit tests and documentation.
          Hide
          Mike Percy added a comment -

          Tom, great idea! I just did a partial review, will look more closely tomorrow, but I have a few comments / questions:

          • It would be great to have an example properties file for id => schema mappings in the unit tests or something if that file is needed.
          • Couldn't we just store the schemas (one per line) and auto-generate the 64-bit hash? Why bother storing that?
          • It would be great to add user documentation in the flume-ng-doc/sphinx directory (user guide) if possible.

          Regards,
          Mike

          Show
          Mike Percy added a comment - Tom, great idea! I just did a partial review, will look more closely tomorrow, but I have a few comments / questions: It would be great to have an example properties file for id => schema mappings in the unit tests or something if that file is needed. Couldn't we just store the schemas (one per line) and auto-generate the 64-bit hash? Why bother storing that? It would be great to add user documentation in the flume-ng-doc/sphinx directory (user guide) if possible. Regards, Mike
          Hide
          Tom White added a comment -

          I had a closer look at AVRO-1124 and wrote a patch that allows the Log4jAppender or the HDFS sink to use a Repository to lookup schemas (and register them if they are not already there). With this approach there is no need for the user to use the MDC to register the schema, so I think it will be an improvement.

          Since AVRO-1124 isn't available yet I changed my patch to use a manually-generated properties file to provide a mapping between ID and schema. The ID is the 64-bit fingerprint. The idea is that you generate this file for you schemas and then distribute it to every flume node. I updated the example at https://github.com/tomwhite/flume-log4j-example/tree/avro. (Once AVRO-1124 is in a release it will be straightforward to update Log4jAppender to use it.)

          This version of the patch adds unit tests too.

          Show
          Tom White added a comment - I had a closer look at AVRO-1124 and wrote a patch that allows the Log4jAppender or the HDFS sink to use a Repository to lookup schemas (and register them if they are not already there). With this approach there is no need for the user to use the MDC to register the schema, so I think it will be an improvement. Since AVRO-1124 isn't available yet I changed my patch to use a manually-generated properties file to provide a mapping between ID and schema. The ID is the 64-bit fingerprint. The idea is that you generate this file for you schemas and then distribute it to every flume node. I updated the example at https://github.com/tomwhite/flume-log4j-example/tree/avro . (Once AVRO-1124 is in a release it will be straightforward to update Log4jAppender to use it.) This version of the patch adds unit tests too.
          Hide
          Tom White added a comment -

          Here's another revision of the patch that reuses the Avro encoder, and support schema IDs rather than sending the schema in a Flume header.

          The idea behind schema IDs is that if you set the ID for the schema in the Log4j MDC then it will be used in the Flume header. (If you don't set it then everything still works, it just has to set the schema in a header for every message.) The HDFS sink then retrieves the schema by looking it up from its configuration file, which has to include the ID -> schema mapping.

          When AVRO-1124 is done we could use that for the schema repository.

          An alternative way of doing this now would be to have a schema catalog properties files with the ID -> schema mapping, and have both the Log4jAppender and HDFS sink use it - in this way we could avoid the MDC part.

          Show
          Tom White added a comment - Here's another revision of the patch that reuses the Avro encoder, and support schema IDs rather than sending the schema in a Flume header. The idea behind schema IDs is that if you set the ID for the schema in the Log4j MDC then it will be used in the Flume header. (If you don't set it then everything still works, it just has to set the schema in a header for every message.) The HDFS sink then retrieves the schema by looking it up from its configuration file, which has to include the ID -> schema mapping. When AVRO-1124 is done we could use that for the schema repository. An alternative way of doing this now would be to have a schema catalog properties files with the ID -> schema mapping, and have both the Log4jAppender and HDFS sink use it - in this way we could avoid the MDC part.
          Hide
          Tom White added a comment -

          The example is in the 'avro' branch: https://github.com/tomwhite/flume-log4j-example/tree/avro.

          Show
          Tom White added a comment - The example is in the 'avro' branch: https://github.com/tomwhite/flume-log4j-example/tree/avro .
          Hide
          Tom White added a comment -

          Here's an early patch for discussion. The idea is that you can log Avro records simply using Log4j:

          Logger logger = Logger.getLogger(...);
          GenericRecord appEvent = ...;
          logger.info(appEvent);
          

          and they end up in HDFS by setting the HDFS sink serializer to org.apache.flume.serialization.GenericAvroEventSerializer$Builder.

          There's an example at https://github.com/tomwhite/flume-log4j-example (adapted from https://github.com/mpercy/flume-log4j-example).

          Show
          Tom White added a comment - Here's an early patch for discussion. The idea is that you can log Avro records simply using Log4j: Logger logger = Logger.getLogger(...); GenericRecord appEvent = ...; logger.info(appEvent); and they end up in HDFS by setting the HDFS sink serializer to org.apache.flume.serialization.GenericAvroEventSerializer$Builder . There's an example at https://github.com/tomwhite/flume-log4j-example (adapted from https://github.com/mpercy/flume-log4j-example ).

            People

            • Assignee:
              Tom White
              Reporter:
              Tom White
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development