Hadoop Common
  1. Hadoop Common
  2. HADOOP-2206

Design/implement a general log-aggregation framework for Hadoop

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      I'd like to propose a log-aggregation framework which facilitates collection, aggregation and storage of the logs of the Hadoop Map-Reduce framework and user-jobs in HDFS. Clearly the design/implementation of this framework is heavily influenced and limited by Hadoop itself for e.g. lack of appends, not too many small files (think: stdout/stderr/syslog of each map/reduce task) and so on.

      This framework will be especially useful once HoD (HADOOP-1301) is used to provision dynamic, per-user, Map-Reduce clusters.

      Requirements:

      • Store the various logs to a configurable location in the Hadoop Distributed FileSystem
        • User task logs (stdout, stderr, syslog)
        • Map-Reduce daemons' logs (JobTracker and TaskTracker)
      • Integrate well with Hadoop and ensure no adverse performance impact on the Map-Reduce framework.
      • It must not use a HDFS file (or more!) per a task, which would swamp the NameNode capabilities.
      • The aggregation system must be distributed and reliable.
      • Facilities/tools to read the aggregated logs.
      • The aggregated logs should be compressed.

      Architecture:

      Here is a high-level overview of the log-aggregation framework:

      Logging
      • Provision a cloud of log-aggregators in the cluster (outside of the Hadoop cluster, running on the subset of nodes in the cluster). Lets call each one in the cloud as a Log Aggregator i.e. LA.
      • Each LA writes out 2 files per Map-Reduce cluster: an index file and a data file. The LA maintains one directory per Map-Reduce cluster on HDFS.
      • The index file format is simple:
        • streamid (streamid is either daemon identifier e.g. tasktracker_foo.bar.com:57891 or $jobid-$taskid-(stdout|stderr|syslog) or individual task-logs)
        • timestamp
        • logs-data start offset
        • no. of bytes
      • Each Hadoop daemon (JT/TT) is given the entire list of LAs in the cluster.
      • Each daemon picks one LA (at random) from the list, opens an exclusive stream with the LA after identifying itself (i.e. $ {daemonid}

        ) and sends it's logs. In case of error/failure to log it just connects to another LA as above and starts logging to it.

      • The logs are sent to the LA by a new log4j appender. The appender provides some amount of buffering on the client-side.
      • Implement a feature in the TaskTracker which lets it use the same appender to send out the userlogs (stdout/stderr/syslog) to the LA after task completion. This is important to ensure that logging to the LA at runtime doesn't hurt the task's performance (see HADOOP-1553). The TaskTracker picks an LA per task in a manner similar to the one it uses for it's own logs, identifies itself (<$ {jobid}

        , $

        {taskid}

        ,

        {stdout|stderr|syslog}

        >) and streams the entire task-log at one go. In fact we can pick different LAs for each of the task's stdout, stderr and syslog logs - each an exclusive stream to a single LA.

      • The LA buffers some amount of data in memory (say 16K) and then flushes that data to the HDFS file (per LA per cluster) after writing out an entry to the index file.
      • The LA periodically purges old logs (monthly, fortnightly or weekly as today).
      Getting the logged information

      The main requirement is to implement a simple set of tools to query the LA (i.e. the index/data files on HDFS) to glean the logged information.

      If we can think of each Map-Reduce cluster's logs as a set of archives (i.e. one file per cluster per LA used) we need the ability to query the log-archive to figure out the available streams and the ability to get one entire stream or a subset of time based on timestamp-ranges. Essentially these are simple tools which parse the index files of each LA (for a given Hadoop cluster) and return the required information.

      Query for available streams

      The query just returns all the available streams in an cluster-log archive identified by the HDFS path.

      It looks something like this for a cluster with 3 nodes which ran 2 jobs, first of which had 2 maps, 1 reduce and the second had 1 map, 1 reduce:

         $ la -query /log-aggregation/cluster-20071113
         Available streams:
         jobtracker_foo.bar.com:57893
         tasktracker_baz.bar.com:57841
         tasktracker_fii.bar.com:57891
         job_20071113_0001-task_20071113_0001_m_000000_0-stdout
         job_20071113_0001-task_20071113_0001_m_000000_0-stderr
         job_20071113_0001-task_20071113_0001_m_000000_0-syslog
         job_20071113_0001-task_20071113_0001_m_000001_0-stdout
         job_20071113_0001-task_20071113_0001_m_000001_0-stderr
         job_20071113_0001-task_20071113_0001_m_000001_0-syslog
         job_20071113_0001-task_20071113_0001_r_000000_0-stdout
         job_20071113_0001-task_20071113_0001_r_000000_0-stderr
         job_20071113_0001-task_20071113_0001_r_000000_0-syslog
         job_20071113_0001-task_20071113_0001_m_000000_0-stdout
         job_20071113_0001-task_20071113_0002_m_000000_0-stderr
         job_20071113_0001-task_20071113_0002_m_000000_0-syslog
         job_20071113_0001-task_20071113_0002_m_000001_0-stdout
         job_20071113_0001-task_20071113_0002_m_000001_0-stderr
         job_20071113_0001-task_20071113_0002_m_000001_0-syslog
         job_20071113_0001-task_20071113_0002_r_000000_0-stdout
         job_20071113_0001-task_20071113_0002_r_000000_0-stderr
         job_20071113_0001-task_20071113_0002_r_000000_0-syslog
      
      Get logged information per stream

      The framework also offers the ability to query and fetch the actual log-data, per-stream for a given timestamp-range. It looks something like:

          $ la -fetch -daemon jt -range <t1:t2> /log-aggregation/cluster-20071113
          $ la -fetch -daemon tt1 /log-aggregation/cluster-20071113
          $ la -fetch -jobid <jobid> -taskid <taskid> -log <out|err|sys> -range <t1:t2> /log-aggregation/cluster-20071113
      

      Thoughts?

        Issue Links

          Activity

            People

            • Assignee:
              Arun C Murthy
              Reporter:
              Arun C Murthy
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development