Hadoop Common
  1. Hadoop Common
  2. HADOOP-2206

Design/implement a general log-aggregation framework for Hadoop

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      I'd like to propose a log-aggregation framework which facilitates collection, aggregation and storage of the logs of the Hadoop Map-Reduce framework and user-jobs in HDFS. Clearly the design/implementation of this framework is heavily influenced and limited by Hadoop itself for e.g. lack of appends, not too many small files (think: stdout/stderr/syslog of each map/reduce task) and so on.

      This framework will be especially useful once HoD (HADOOP-1301) is used to provision dynamic, per-user, Map-Reduce clusters.

      Requirements:

      • Store the various logs to a configurable location in the Hadoop Distributed FileSystem
        • User task logs (stdout, stderr, syslog)
        • Map-Reduce daemons' logs (JobTracker and TaskTracker)
      • Integrate well with Hadoop and ensure no adverse performance impact on the Map-Reduce framework.
      • It must not use a HDFS file (or more!) per a task, which would swamp the NameNode capabilities.
      • The aggregation system must be distributed and reliable.
      • Facilities/tools to read the aggregated logs.
      • The aggregated logs should be compressed.

      Architecture:

      Here is a high-level overview of the log-aggregation framework:

      Logging
      • Provision a cloud of log-aggregators in the cluster (outside of the Hadoop cluster, running on the subset of nodes in the cluster). Lets call each one in the cloud as a Log Aggregator i.e. LA.
      • Each LA writes out 2 files per Map-Reduce cluster: an index file and a data file. The LA maintains one directory per Map-Reduce cluster on HDFS.
      • The index file format is simple:
        • streamid (streamid is either daemon identifier e.g. tasktracker_foo.bar.com:57891 or $jobid-$taskid-(stdout|stderr|syslog) or individual task-logs)
        • timestamp
        • logs-data start offset
        • no. of bytes
      • Each Hadoop daemon (JT/TT) is given the entire list of LAs in the cluster.
      • Each daemon picks one LA (at random) from the list, opens an exclusive stream with the LA after identifying itself (i.e. $ {daemonid}

        ) and sends it's logs. In case of error/failure to log it just connects to another LA as above and starts logging to it.

      • The logs are sent to the LA by a new log4j appender. The appender provides some amount of buffering on the client-side.
      • Implement a feature in the TaskTracker which lets it use the same appender to send out the userlogs (stdout/stderr/syslog) to the LA after task completion. This is important to ensure that logging to the LA at runtime doesn't hurt the task's performance (see HADOOP-1553). The TaskTracker picks an LA per task in a manner similar to the one it uses for it's own logs, identifies itself (<$ {jobid}

        , $

        {taskid}

        ,

        {stdout|stderr|syslog}

        >) and streams the entire task-log at one go. In fact we can pick different LAs for each of the task's stdout, stderr and syslog logs - each an exclusive stream to a single LA.

      • The LA buffers some amount of data in memory (say 16K) and then flushes that data to the HDFS file (per LA per cluster) after writing out an entry to the index file.
      • The LA periodically purges old logs (monthly, fortnightly or weekly as today).
      Getting the logged information

      The main requirement is to implement a simple set of tools to query the LA (i.e. the index/data files on HDFS) to glean the logged information.

      If we can think of each Map-Reduce cluster's logs as a set of archives (i.e. one file per cluster per LA used) we need the ability to query the log-archive to figure out the available streams and the ability to get one entire stream or a subset of time based on timestamp-ranges. Essentially these are simple tools which parse the index files of each LA (for a given Hadoop cluster) and return the required information.

      Query for available streams

      The query just returns all the available streams in an cluster-log archive identified by the HDFS path.

      It looks something like this for a cluster with 3 nodes which ran 2 jobs, first of which had 2 maps, 1 reduce and the second had 1 map, 1 reduce:

         $ la -query /log-aggregation/cluster-20071113
         Available streams:
         jobtracker_foo.bar.com:57893
         tasktracker_baz.bar.com:57841
         tasktracker_fii.bar.com:57891
         job_20071113_0001-task_20071113_0001_m_000000_0-stdout
         job_20071113_0001-task_20071113_0001_m_000000_0-stderr
         job_20071113_0001-task_20071113_0001_m_000000_0-syslog
         job_20071113_0001-task_20071113_0001_m_000001_0-stdout
         job_20071113_0001-task_20071113_0001_m_000001_0-stderr
         job_20071113_0001-task_20071113_0001_m_000001_0-syslog
         job_20071113_0001-task_20071113_0001_r_000000_0-stdout
         job_20071113_0001-task_20071113_0001_r_000000_0-stderr
         job_20071113_0001-task_20071113_0001_r_000000_0-syslog
         job_20071113_0001-task_20071113_0001_m_000000_0-stdout
         job_20071113_0001-task_20071113_0002_m_000000_0-stderr
         job_20071113_0001-task_20071113_0002_m_000000_0-syslog
         job_20071113_0001-task_20071113_0002_m_000001_0-stdout
         job_20071113_0001-task_20071113_0002_m_000001_0-stderr
         job_20071113_0001-task_20071113_0002_m_000001_0-syslog
         job_20071113_0001-task_20071113_0002_r_000000_0-stdout
         job_20071113_0001-task_20071113_0002_r_000000_0-stderr
         job_20071113_0001-task_20071113_0002_r_000000_0-syslog
      
      Get logged information per stream

      The framework also offers the ability to query and fetch the actual log-data, per-stream for a given timestamp-range. It looks something like:

          $ la -fetch -daemon jt -range <t1:t2> /log-aggregation/cluster-20071113
          $ la -fetch -daemon tt1 /log-aggregation/cluster-20071113
          $ la -fetch -jobid <jobid> -taskid <taskid> -log <out|err|sys> -range <t1:t2> /log-aggregation/cluster-20071113
      

      Thoughts?

        Issue Links

          Activity

          Hide
          Owen O'Malley added a comment -

          I think Chukwa is addressing most of these needs.

          Show
          Owen O'Malley added a comment - I think Chukwa is addressing most of these needs.
          Hide
          Allen Wittenauer added a comment -

          To Doug's Comment: "do we worry about security? is anyone allowed to read anyone else's logs?"

          Yes, we need to worry about security here. Users should not be able to read other user's logs.

          Show
          Allen Wittenauer added a comment - To Doug's Comment: "do we worry about security? is anyone allowed to read anyone else's logs?" Yes, we need to worry about security here. Users should not be able to read other user's logs.
          Hide
          Jeff Hammerbacher added a comment -

          we're getting the code in shape and hope to have a public copy up by the end of the month.

          Show
          Jeff Hammerbacher added a comment - we're getting the code in shape and hope to have a public copy up by the end of the month.
          Hide
          Doug Cutting added a comment -

          > I got Arun a copy of Scribe a few months ago.

          Any chance you can post a public copy somewhere?

          Show
          Doug Cutting added a comment - > I got Arun a copy of Scribe a few months ago. Any chance you can post a public copy somewhere?
          Hide
          Jeff Hammerbacher added a comment -

          I got Arun a copy of Scribe a few months ago.

          Show
          Jeff Hammerbacher added a comment - I got Arun a copy of Scribe a few months ago.
          Hide
          Enis Soztutar added a comment -

          Jeff, any update on this? Maybe you can provide us an alpha release to get our hands dirty.

          Show
          Enis Soztutar added a comment - Jeff, any update on this? Maybe you can provide us an alpha release to get our hands dirty.
          Hide
          Arun C Murthy added a comment -

          I'm marking for 0.17.0.

          Show
          Arun C Murthy added a comment - I'm marking for 0.17.0.
          Hide
          Nate Carlson added a comment -

          Wow Jeff, that sounds sweet. Any idea when it'll be ready to go as open source? I'm currently trying to figure out how to deal with our hadoop logs.

          Show
          Nate Carlson added a comment - Wow Jeff, that sounds sweet. Any idea when it'll be ready to go as open source? I'm currently trying to figure out how to deal with our hadoop logs.
          Hide
          Chad Walters added a comment -

          +1 for Jeff's suggestion

          +1 for Arun's enthusiasm for reuse

          Show
          Chad Walters added a comment - +1 for Jeff's suggestion +1 for Arun's enthusiasm for reuse
          Hide
          Arun C Murthy added a comment -

          Woah! That's very interesting to hear! We'd definitely be interested in understanding Scribe before we reinvent the fire, wheel and the steam-engine...

          How do I get my hands on the beta version of Scribe? Thanks for letting us know!

          Show
          Arun C Murthy added a comment - Woah! That's very interesting to hear! We'd definitely be interested in understanding Scribe before we reinvent the fire, wheel and the steam-engine... How do I get my hands on the beta version of Scribe? Thanks for letting us know!
          Hide
          Jeff Hammerbacher added a comment -

          I should also mention we're pretty far along with the "facilities/tools to read the aggregated logs" and compression components. In fact, our central use case for Hadoop is analysis of massive, structured logfiles--if you guys are in the Bay Area, maybe you can swing by Palo Alto for a demo? Also, we're headed to Santa Clara to talk with the Pig team in two weeks, if that works better...

          Show
          Jeff Hammerbacher added a comment - I should also mention we're pretty far along with the "facilities/tools to read the aggregated logs" and compression components. In fact, our central use case for Hadoop is analysis of massive, structured logfiles--if you guys are in the Bay Area, maybe you can swing by Palo Alto for a demo? Also, we're headed to Santa Clara to talk with the Pig team in two weeks, if that works better...
          Hide
          Jeff Hammerbacher added a comment -

          Hey, we have a Thrift service here at Facebook called Scribe that does distributed logfile aggregation. It's used heavily, doing several TB/day in logfile collection, and it's designed to have low CPU impact, as the service runs everywhere, including web servers. We've cut a beta release and given it to a few folks to squash remaining bugs before we open source the project. It seems that Scribe fits perfectly into this design as the LA. Would you guys be interested in trying out a beta release of Scribe as the LA? The service is written in C++ but we've gotten it writing directly into HDFS over JNI (much more useful once appends are in).

          Show
          Jeff Hammerbacher added a comment - Hey, we have a Thrift service here at Facebook called Scribe that does distributed logfile aggregation. It's used heavily, doing several TB/day in logfile collection, and it's designed to have low CPU impact, as the service runs everywhere, including web servers. We've cut a beta release and given it to a few folks to squash remaining bugs before we open source the project. It seems that Scribe fits perfectly into this design as the LA. Would you guys be interested in trying out a beta release of Scribe as the LA? The service is written in C++ but we've gotten it writing directly into HDFS over JNI (much more useful once appends are in).
          Hide
          Enis Soztutar added a comment -

          Some ideas :

          • the logs should have associated log type attribute. The log type attribute(userlog, mapred, datanode, etc) will enable us to filter by log type. This is similar to the proposed "available streams" approach. However currently the stream(s)
            job_20071113_0001-task_20071113_0001_m_000000_0-stderr

            may contain logs from the framework itself(such as ipc DEBUG logs etc.) I propose we switch getting loggers per class to getting loggers per "component"(DN, TT, JT, Job_0001, etc)

          • The system as a whole should be reliable. We must make sure that no logs are lost even when the LAs are down.
            i can think of three solutions here, first is to append to multiple LAs and let the appenders take care of dealing with duplicates. Second is to open a new child process for each logger(or use existing one in the machine), which sends the logs to the LA and buffers them until LA reports back that it has stored the logs permanently. If anything happens to the logger, the process can still continue to try pushing the logs. The last one and the simplest is to both log through the LAs and also to the disk, in case a log could not be persisted on LAs.
          • log analysis subsystem should be extensible so that users may write programs to filter the logs, in any way they want.
          • according to my xp in HADOOP-53, we may have a hard time if we use log4j's appenders for this job. The weird thing happens when we want to log something over ipc, but the ipc calls generate their own logs, resulting in some kind of deadlock.
          Show
          Enis Soztutar added a comment - Some ideas : the logs should have associated log type attribute. The log type attribute(userlog, mapred, datanode, etc) will enable us to filter by log type. This is similar to the proposed "available streams" approach. However currently the stream(s) job_20071113_0001-task_20071113_0001_m_000000_0-stderr may contain logs from the framework itself(such as ipc DEBUG logs etc.) I propose we switch getting loggers per class to getting loggers per "component"(DN, TT, JT, Job_0001, etc) The system as a whole should be reliable. We must make sure that no logs are lost even when the LAs are down. i can think of three solutions here, first is to append to multiple LAs and let the appenders take care of dealing with duplicates. Second is to open a new child process for each logger(or use existing one in the machine), which sends the logs to the LA and buffers them until LA reports back that it has stored the logs permanently. If anything happens to the logger, the process can still continue to try pushing the logs. The last one and the simplest is to both log through the LAs and also to the disk, in case a log could not be persisted on LAs. log analysis subsystem should be extensible so that users may write programs to filter the logs, in any way they want. according to my xp in HADOOP-53 , we may have a hard time if we use log4j's appenders for this job. The weird thing happens when we want to log something over ipc, but the ipc calls generate their own logs, resulting in some kind of deadlock.
          Hide
          Doug Cutting added a comment -

          A few thoughts:

          • do we worry about security? is anyone allowed to read anyone else's logs?
          • shouldn't LA's be allocated according to cluster topology, i.e., on the same rack as the node?
          • adding a new set of daemons seems onerous. perhaps instead each tasktracker could run a log server, and the jobtracker could select a subset of tasktrackers to log a job, and then pass each tasktracker that subset. so not all of the daemons would be used by a particular job. one could select, e.g., the percentage of tasktrackers to use for logging, perhaps one per rack.
          Show
          Doug Cutting added a comment - A few thoughts: do we worry about security? is anyone allowed to read anyone else's logs? shouldn't LA's be allocated according to cluster topology, i.e., on the same rack as the node? adding a new set of daemons seems onerous. perhaps instead each tasktracker could run a log server, and the jobtracker could select a subset of tasktrackers to log a job, and then pass each tasktracker that subset. so not all of the daemons would be used by a particular job. one could select, e.g., the percentage of tasktrackers to use for logging, perhaps one per rack.

            People

            • Assignee:
              Arun C Murthy
              Reporter:
              Arun C Murthy
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development