Pig
  1. Pig
  2. PIG-688

PERFORMANCE: Vectorize operators

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      By Vectorization, I mean passing multiple (/vector of) records at a time between operators (and potentially other functions like udfs)

      Vectorization of pig operators can improve performance by
      1. improving locality and cache utilization
      2. Reducing number of function calls. Many functions calls are likely to be dynamically resolved. There may be some checks in each function that we might be able to do once for several recs.
      3. Potentially benefit from cpu pipeline architecture. ( But I don't know how good java VM is at that ..)

      To do vectorization in map stage, we need to use MapRunner - see PIG-687.

        Activity

        Hide
        Jeff Zhang added a comment -

        Thejas, do you mean to create an new MapRunnable class for Pig ?
        The default MapRunnable implementation in hadoop MapRunner read records one by one, so if we want to process multiple records between operators, we need to create a new MapRunner for it.

        Show
        Jeff Zhang added a comment - Thejas, do you mean to create an new MapRunnable class for Pig ? The default MapRunnable implementation in hadoop MapRunner read records one by one, so if we want to process multiple records between operators, we need to create a new MapRunner for it.
        Hide
        Olga Natkovich added a comment -

        The existing MapRunner should be fine since it provides an iterator interface and we can place data into a bag before passing it to the pipeline

        Show
        Olga Natkovich added a comment - The existing MapRunner should be fine since it provides an iterator interface and we can place data into a bag before passing it to the pipeline
        Hide
        Jeff Zhang added a comment -

        What do you mean about the iterator interface ? I check the source code of MapRunner, it gets input from RecordReader one by one. So in the Mapper class it only get one tuple each time, how can it get multiple tuples ?

        Show
        Jeff Zhang added a comment - What do you mean about the iterator interface ? I check the source code of MapRunner, it gets input from RecordReader one by one. So in the Mapper class it only get one tuple each time, how can it get multiple tuples ?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Jeff –
        Iterator means one by one, so there is no conflict between what Olga is saying and the code you are reading.

        The idea is, instead of getting results by pulling a single tuple through a chain of operators one at a time, pull batches through. At the root of the operator tree, the batches would have to be built using several successive next() calls. The number of next() calls on the reader would stay the same, of course, but the number of next() calls on the operators would be reduced by a factor of b, where b is the size of a batch (or vector, or bag).

        Show
        Dmitriy V. Ryaboy added a comment - Jeff – Iterator means one by one, so there is no conflict between what Olga is saying and the code you are reading. The idea is, instead of getting results by pulling a single tuple through a chain of operators one at a time, pull batches through. At the root of the operator tree, the batches would have to be built using several successive next() calls. The number of next() calls on the reader would stay the same, of course, but the number of next() calls on the operators would be reduced by a factor of b, where b is the size of a batch (or vector, or bag).
        Hide
        Jeff Zhang added a comment -

        Dmitriy, thank you for your explanation. So my understanding is that we only need to create a Tuple Buffer of size n in PigMapBase, and process the Tuple Buffer as a batch through Map Plan, and finally process the remaining tuples in the close() method

        Show
        Jeff Zhang added a comment - Dmitriy, thank you for your explanation. So my understanding is that we only need to create a Tuple Buffer of size n in PigMapBase, and process the Tuple Buffer as a batch through Map Plan, and finally process the remaining tuples in the close() method
        Hide
        Dmitriy V. Ryaboy added a comment -

        I am not sure what you mean by processing remaining tuples in the close() method.

        My understanding is more like this: in the first next() call on the leaf operator, a next() call cascades down the tree, and a bag of tuples gets processed as it comes back up the tree. The next b calls get a pre-computed result from the buffer. Once the buffer is empty, the operation is repeated, until all tuples at the bottom of the tree have been fetched. The close method is unaffected.

        Show
        Dmitriy V. Ryaboy added a comment - I am not sure what you mean by processing remaining tuples in the close() method. My understanding is more like this: in the first next() call on the leaf operator, a next() call cascades down the tree, and a bag of tuples gets processed as it comes back up the tree. The next b calls get a pre-computed result from the buffer. Once the buffer is empty, the operation is repeated, until all tuples at the bottom of the tree have been fetched. The close method is unaffected.
        Hide
        Jeff Zhang added a comment -

        Dmitriy, I think I get your meaning. You mean create a buffer in operator which store the output of this operator, and each time when you call next(), first check this buffer, if buffer is not empty, poll tuple from this buffer, or if buffer is empty, call cascading next() of its parents to fill this buffer. But here's a problem that you can not poll data from ReaderReader, actually hadoop push the record to map class, it is conflicted with operator's poll model.

        And my idea is to create a tuple buffer in PigMapBase, and store the tuple from RecordReader in this buffer. The buffer is the input of the root of mapper plan. And do not run the runPipeLine until the buffer is full. When the buffer is full, transfer this bag (buffer) through the tree. And in the close() method, I will check the buffer, if it's not empty, run the runPipeLine() to process the remaining tuples in buffer.

        Show
        Jeff Zhang added a comment - Dmitriy, I think I get your meaning. You mean create a buffer in operator which store the output of this operator, and each time when you call next(), first check this buffer, if buffer is not empty, poll tuple from this buffer, or if buffer is empty, call cascading next() of its parents to fill this buffer. But here's a problem that you can not poll data from ReaderReader, actually hadoop push the record to map class, it is conflicted with operator's poll model. And my idea is to create a tuple buffer in PigMapBase, and store the tuple from RecordReader in this buffer. The buffer is the input of the root of mapper plan. And do not run the runPipeLine until the buffer is full. When the buffer is full, transfer this bag (buffer) through the tree. And in the close() method, I will check the buffer, if it's not empty, run the runPipeLine() to process the remaining tuples in buffer.
        Hide
        Jeff Zhang added a comment -

        Actually, MapRunner push record to Map class.

        Show
        Jeff Zhang added a comment - Actually, MapRunner push record to Map class.

          People

          • Assignee:
            Unassigned
            Reporter:
            Thejas M Nair
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:

              Development