Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-2085

Map-side joins on sorted, equally-partitioned datasets

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.16.0
    • None
    • None

    Description

      Motivation

      Given a set of sorted datasets keyed with the same class and yielding equal
      partitions, it is possible to effect a join of those datasets prior to the
      map. This could save costs in re-partitioning, sorting, shuffling, and
      writing out data required in the general case.

      Interface

      The attached code offers the following interface to users of these classes.

      property required value
      mapred.join.expr yes Join expression to effect over input data
      mapred.join.keycomparator no WritableComparator class to use for comparing keys
      mapred.join.define.<ident> no Class mapped to identifier in join expression

      The join expression understands the following grammar:

      func ::= <ident>([<func>,]*<func>)
      func ::= tbl(<class>,"<path>");
      

      Operations included in this patch are partitioned into one of two types:
      join operations emitting tuples and "multi-filter" operations emitting a
      single value from (but not necessarily included in) a set of input values.
      For a given key, each operation will consider the cross product of all
      values for all sources at that node.

      Identifiers supported by default:

      identifier type description
      inner Join Full inner join
      outer Join Full outer join
      override MultiFilter For a given key, prefer values from the rightmost source

      A user of this class must set the InputFormat for the job to
      CompositeInputFormat and define a join expression accepted by the preceding
      grammar. For example, both of the following are acceptable:

      inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                "hdfs://host:8020/foo/bar"),
            tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                "hdfs://host:8020/foo/baz"))
      
      outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                         "hdfs://host:8020/foo/bar"),
                     tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                         "hdfs://host:8020/foo/baz")),
            tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
                "hdfs://host:8020/foo/rab"))
      

      CompositeInputFormat includes a handful of convenience methods to aid
      construction of these verbose statements.

      As in the second example, joins may be nested. Users may provide a
      comparator class in the mapred.join.keycomparator property to
      specify the ordering of their keys, or accept the default comparator as
      returned by WritableComparator.get(keyclass).

      Users can specify their own join operations, typically by overriding
      JoinRecordReader or MultiFilterRecordReader and mapping that class
      to an identifier in the join expression using the
      mapred.join.define.ident property, where ident is the identifier
      appearing in the join expression. Users may elect to emit- or modify- values
      passing through their join operation. Consulting the existing operations for
      guidance is recommended. Adding arguments is considerably more complex (and
      only partially supported), as one must also add a Node type to the parse
      tree. One is probably better off extending RecordReader in most cases.

      Design

      As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
      types for the join tree. Delegation satisfies most requirements of the
      InputFormat contract, particularly validateInput and getSplits.
      Most of the work in this patch concerns getRecordReader. The
      CompositeInputFormat itself delegates to the parse tree generated by
      Parser.

      Hierarchical Joins

      Each RecordReader from the user must be "wrapped", since effecting a
      join requires the framework to track the head value from each source. Since
      the cross product of all values for each composite level of the join is
      emitted to its parent, all sources 1 must be capable of repeating the
      values for the current key. To avoid keeping an excessive number of copies
      (one per source per level), each composite requests its children to populate
      a JoinCollector with an iterator over its values. This way, there is
      only one copy of the current key for each composite node, the head key-value
      pair for each leaf, and storage at each leaf for all the values matching the
      current key at the parent collector (if it is currently participating in a
      join at the root). Strategies have been employed to avoid excessive copying
      when filling a user-provided Writable, but they have been conservative
      (e.g. in MultiFilterRecordReader, the value emitted is cloned in case
      the user modifies the value returned, possibly changing the state of a
      JoinCollector in the tree). For example, if the following sources
      contain these key streams:

      A: 0  0   1    1     2        ...
      B: 1  1   1    1     2        ...
      C: 1  6   21   107   ...
      D: 6  28  496  8128  33550336 ...
      

      Let A-D be wrapped sources and x,y be composite operations. If the
      expression is of the form x(A, y(B,C,D)), then when the current key at
      the root is 1 the tree may look like this:

                  x (1, [ I(A), [ I(y) ] ] )
                /   \
               W     y (1, [ I(B), I(C), EMPTY ])
               |   / | \
               |  W  W  W
               |  |  |  D (6, V~6~) => EMPTY
               |  |  C (6, V~6~)    => V~1.1~ @1.1
               |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
               A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
      

      A JoinCollector from x will have been created by requesting an
      iterator from A and another from y. The iterator at y is built by
      requesting iterators from B, C, and D. Since D doesn't contain the
      key 1, it returns an empty iterator. Since the value to return for a given
      join is a Writable provided by the user, the iterators returned are also
      responsible for writing the next value in that stream. For multilevel joins
      passing through a subclass of JoinRecordReader, the value produced will
      contain tuples within tuples; iterators for composites delegate to
      sub-iterators responsible for filling the value in the tuple at the position
      matching their position in the composite. In a sense, the only iterators
      that write to a tuple are the RecordReader s at the leaves. Note that
      this also implies that emitted tuples may not contain values from each
      source, but they will always have the same capacity.

      Writables

      Writable objects- including InputSplit s and TupleWritable s-
      encode themselves in the following format:

      <count><class1><class2>...<classn><obj1><obj2>...<objn>
      

      The inefficiency is regrettable- particularly since this overhead is
      incurred for every instance and most often the tuples emitted will be
      processed only within the map- but the encoding satisfies the Writable
      contract well enough to be emitted to the reducer, written to disk, etc. It
      is hoped that general compression will trim the most egregious waste. It
      should be noted that the framework does not actually write out a tuple (i.e.
      does not suffer for this deficiency) unless emitting one from
      MultiFilterRecordReader (a rare case in practice, it is hoped).

      Extensibility

      The join framework is modestly extensible. Practically, users seeking to add
      their own identifiers to join expressions are limited to extending
      JoinRecordReader and MultiFilterRecordReader. There is considerable
      latitude within these constraints, as illustrated in
      OverrideRecordReader, where values in child RecordReader s are
      skipped instead of incurring the overhead of building the iterator (that
      will inevitably be discarded).2 For most cases, the user need only
      implement the combine and/or emit methods in their subclass. It is expected
      that most will find that the three default operations will suffice.

      Adding arguments to expressions is more difficult. One would need to include
      a Node type for the parser, which requires some knowledge of its inner
      workings. The model in this area is crude and requires refinement before it
      can be "extensible" by a reasonable definition.

      Performance

      I have no numbers.

      Notes

      1. This isn't strictly true. The "leftmost" source will never need to repeat
      itself. Adding a pseudo-ResettableIterator to handle this case would be
      a welcome addition.

      2. Note that- even if reset- the override will only loop through the values
      in the rightmost key, instead of repeating that series a number of times
      equal to the cardinality of the cross product of the discarded streams
      (regrettably, looking at the code of OverrideRecordReader is more
      illustrative than this explanation).

      Attachments

        1. 2085.patch
          101 kB
          Christopher Douglas
        2. 2085-2.patch
          101 kB
          Christopher Douglas
        3. 2085-3.patch
          109 kB
          Christopher Douglas
        4. 2085-4.patch
          108 kB
          Christopher Douglas
        5. 2085-5.patch
          112 kB
          Christopher Douglas

        Activity

          People

            cdouglas Christopher Douglas
            cdouglas Christopher Douglas
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: