Uploaded image for project: 'Apache Tez'
  1. Apache Tez
  2. TEZ-3865

A new vertex manager to partition data for STORE

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      Restricting number of files in output is a very common use case. In Pig, currently users add a ORDER BY, GROUP BY or DISTINCT with the required parallelism before STORE to achieve it. All of the above operations create unnecessary overhead in processing. It would be ideal if STORE clause supported the PARALLEL statement and the partitioning of data was handled in a more simple and efficient manner.

      Partitioning of the data can be achieved using a very efficient vertex manager as described below. Going to call it PartitionVertexManager (PVM) for now till someone proposes a better name. Will be explaining using Pig examples, but the logic is same for hive as well.

      There are multiple cases to consider when storing
      1) No partitions

      • Data is stored into a single directory using FileOutputFormat implementations
        2) Partitions
      • Data is stored into multiple partitions. Case of static or dynamic partitioning with HCat
        3) HBase
        I have kind of forgotten what exactly my thoughts were on this when storing to multiple regions. Will update once I remember.

      Let us consider below script with pig.exec.bytes.per.reducer (this setting is usually translated to tez.shuffle-vertex-manager.desired-task-input-size with ShuffleVertexManager) set to 1G.

      A = LOAD 'data' ....;
      B = GROUP A BY $0 PARALLEL 1000;
      C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..;
      D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20;
      

      The implementation will have 3 vertices.
      v1 - LOAD vertex
      v2 - GROUP BY vertex
      v3 - STORE vertex

      PVM will be used on v3. It is going to be similar to ShuffleVertexManager but with some differences. The main difference is that the source vertex does not care about the parallelism of destination vertex and the number of partitioned outputs it produces does not depend on that.

      1) Case of no partitions
      Each task in vertex v2 will produce a single partition output (no Partitioner is required). The PVM will bucket this single partition data from 1000 source tasks into multiple destination tasks of v3 trying to keep 1G per task but max of 20 tasks (auto parallelism).

      2) Partitions
      Let us say the table has 2 partition keys (dt and region). Since there could be any number of regions for a given date, we will use store parallelism as the upper limit on the number of partitions. i.e a HashPartitioner with numReduceTasks as 20 and (dt, region) as the partition key. If there are only 5 regions then each task of v2 will produce 5 partitions (with rest 15 being empty) if there is no hash collision. If there are 30 regions, then each task of v2 will produce 20 partitions.

      The PVM when it groups will try to group all Partition0 segments as much as possible into one v3 task. Based on skew it could end up in more tasks. i.e there is no restriction on one partition going to same reducer task. Doing this will avoid having to open multiple ORC files in one task when doing dynamic partitioning and will be very efficient reducing namespace usage even further while keeping file sizes more uniform.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rohini Rohini Palaniswamy
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: