Tajo
  1. Tajo
  2. TAJO-292

Too many intermediate partition files

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.2-incubating
    • Fix Version/s: 0.8.0
    • Component/s: Data Shuffle
    • Labels:
      None

      Description

      Unlike the before, the number of partitions are being currently determined by the volume size and the number of distinct keys. It can cause unnecessary overheads. We need to improve the partition number determiner to consider the number of cluster nodes.

      1. TAJO-292_3.patch
        24 kB
        Jinho Kim
      2. TAJO-292_2.patch
        22 kB
        Jinho Kim
      3. TAJO-292.patch
        23 kB
        Jinho Kim

        Issue Links

          Activity

          Hide
          Jihoon Son added a comment -

          +1 for this issue.
          This will improve the performance significantly.

          Show
          Jihoon Son added a comment - +1 for this issue. This will improve the performance significantly.
          Hide
          Jinho Kim added a comment -

          In addition, I've added clean up the temporary directories.

          • Clean up the all temporary directories at cluster starting.
            • tajo.worker.start.cleanup [default=false]
          • Automate clean up the query intermediate file
          Show
          Jinho Kim added a comment - In addition, I've added clean up the temporary directories. Clean up the all temporary directories at cluster starting. tajo.worker.start.cleanup [default=false] Automate clean up the query intermediate file
          Hide
          Jihoon Son added a comment -

          When the size of the intermediate data is sufficiently large, the number of tasks looks to be the number of worker slots.
          In my opinion, since the number of tasks is fixed regardless of the size of the intermediate data, the task failure overhead will be increased as the size of the intermediate data increases.
          How about limit the maximum task size?

          Show
          Jihoon Son added a comment - When the size of the intermediate data is sufficiently large, the number of tasks looks to be the number of worker slots. In my opinion, since the number of tasks is fixed regardless of the size of the intermediate data, the task failure overhead will be increased as the size of the intermediate data increases. How about limit the maximum task size?
          Hide
          Jihoon Son added a comment -

          I'm sorry that I misunderstood this issue.
          The main purpose of this issue is to get the proper number of partitions.
          Because each task processes each partition, I suggested like above.
          However, the task size should be treated at something else where each task is created.
          So, I agree with this implementation.
          I'll review the remaining part of the patch.

          Show
          Jihoon Son added a comment - I'm sorry that I misunderstood this issue. The main purpose of this issue is to get the proper number of partitions. Because each task processes each partition, I suggested like above. However, the task size should be treated at something else where each task is created. So, I agree with this implementation. I'll review the remaining part of the patch.
          Hide
          Keuntae Park added a comment -

          I appreciate your effort, Jinho.
          I've even experienced no free inode in disks due to too many intermediate files
          Now, I periodically delete intermediate files manually.

          So, your patch will greatly help me.
          Thank you so much.

          Show
          Keuntae Park added a comment - I appreciate your effort, Jinho. I've even experienced no free inode in disks due to too many intermediate files Now, I periodically delete intermediate files manually. So, your patch will greatly help me. Thank you so much.
          Hide
          Jinho Kim added a comment -

          Jihoon,
          You're right. if the cluster size are small, the intermediate data size are increased.
          I will compress the intermediate data.
          Thank you for the review

          Show
          Jinho Kim added a comment - Jihoon, You're right. if the cluster size are small, the intermediate data size are increased. I will compress the intermediate data. Thank you for the review
          Hide
          Jihoon Son added a comment -

          In my opinion, reducing the size of the intermediate data also should be handled as a separate problem from deciding the task size.
          Anyway, may I review your patch tonight?
          If this issue is urgent, it would be better that any others review it.

          Show
          Jihoon Son added a comment - In my opinion, reducing the size of the intermediate data also should be handled as a separate problem from deciding the task size. Anyway, may I review your patch tonight? If this issue is urgent, it would be better that any others review it.
          Hide
          Jinho Kim added a comment -

          Sure, I will wait for your review

          Show
          Jinho Kim added a comment - Sure, I will wait for your review
          Hide
          Jihoon Son added a comment -

          +1.
          The patch looks great to me.
          Please remove an added empty line in TajoWorkerResourceManager.java before commit.

          Show
          Jihoon Son added a comment - +1. The patch looks great to me. Please remove an added empty line in TajoWorkerResourceManager.java before commit.
          Hide
          Hyunsik Choi added a comment -

          I'm reviewing the patch. Please wait for a while.

          Show
          Hyunsik Choi added a comment - I'm reviewing the patch. Please wait for a while.
          Hide
          Hyunsik Choi added a comment -

          For me, this is a good workaround code for this problem. Here is my comments about your patch.

          • It would be better to rename tajo.worker.start.cleanup to tajo.worker.tmpdir.cleanup-at-startup. It's because the config is for tajo.worker.tmpdir. It looks more consistent.
          • the below code should be inserted into the end of WorkerManagerService::cleanup(). In addition, cleanup's return type need to be BoolProto.
            done.run(TajoWorker.TRUE_PROTO);
            
            • Async rpc internally keeps a callback sequence id in the concurrent map until it is returned. So, done.run must be called once.
          • For the same reason, the line 184 In QueryMaster should be changed to
            tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
            
          Show
          Hyunsik Choi added a comment - For me, this is a good workaround code for this problem. Here is my comments about your patch. It would be better to rename tajo.worker.start.cleanup to tajo.worker.tmpdir.cleanup-at-startup. It's because the config is for tajo.worker.tmpdir. It looks more consistent. the below code should be inserted into the end of WorkerManagerService::cleanup(). In addition, cleanup's return type need to be BoolProto. done.run(TajoWorker.TRUE_PROTO); Async rpc internally keeps a callback sequence id in the concurrent map until it is returned. So, done.run must be called once. For the same reason, the line 184 In QueryMaster should be changed to tajoWorkerProtocolService.cleanup( null , queryId.getProto(), NullCallback.get());
          Hide
          Jihoon Son added a comment -

          Thanks, Hyunsik.
          I missed them.

          Show
          Jihoon Son added a comment - Thanks, Hyunsik. I missed them.
          Hide
          Jinho Kim added a comment -

          Thanks guys for the review.
          I've uploaded the second patch that reflects suggestions.

          Show
          Jinho Kim added a comment - Thanks guys for the review. I've uploaded the second patch that reflects suggestions.
          Hide
          Hyunsik Choi added a comment -

          There is a missing thing. This patch also should handle symmetric repartition join. The way is very similar to your group-by work.

          For that, please take a look at the below code. This code chooses a smaller table and gets the proper number of partitions. After this patch is applied, the maximum number of partitions are limited to worker slots. So, you need to choose lager tables as the base table for calculating the number of task.

          502 line in SubQuery.java
          // for inner
                  ExecutionBlock inner = childs.get(1);
                  long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
                  LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
                  LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
          
                  long smaller = Math.min(outerVolume, innerVolume);
          
                  int mb = (int) Math.ceil((double)smaller / 1048576);
                  LOG.info("Smaller Table's volume is approximately " + mb + " MB");
                  // determine the number of task
                  int taskNum = (int) Math.ceil((double)mb /
                      conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
                  LOG.info("The determined number of join partitions is " + taskNum);
                  return taskNum;
          
          Show
          Hyunsik Choi added a comment - There is a missing thing. This patch also should handle symmetric repartition join. The way is very similar to your group-by work. For that, please take a look at the below code. This code chooses a smaller table and gets the proper number of partitions. After this patch is applied, the maximum number of partitions are limited to worker slots. So, you need to choose lager tables as the base table for calculating the number of task. 502 line in SubQuery.java // for inner ExecutionBlock inner = childs.get(1); long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner ); LOG.info( "Outer volume: " + Math .ceil(( double )outerVolume / 1048576)); LOG.info( "Inner volume: " + Math .ceil(( double )innerVolume / 1048576)); long smaller = Math .min(outerVolume, innerVolume); int mb = ( int ) Math .ceil(( double )smaller / 1048576); LOG.info( "Smaller Table's volume is approximately " + mb + " MB" ); // determine the number of task int taskNum = ( int ) Math .ceil(( double )mb / conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME)); LOG.info( "The determined number of join partitions is " + taskNum); return taskNum;
          Hide
          Jinho Kim added a comment - - edited

          I've uploaded the patch that reflects your comment.
          verified TPC-H 1, 3, 10. please review this. thanks

          Show
          Jinho Kim added a comment - - edited I've uploaded the patch that reflects your comment. verified TPC-H 1, 3, 10. please review this. thanks
          Hide
          Hyunsik Choi added a comment -

          +1

          Show
          Hyunsik Choi added a comment - +1
          Hide
          Jinho Kim added a comment -

          Thanks guys for the review.
          I've just committed it.

          Show
          Jinho Kim added a comment - Thanks guys for the review. I've just committed it.
          Hide
          Keuntae Park added a comment -

          Great job!!

          Show
          Keuntae Park added a comment - Great job!!

            People

            • Assignee:
              Jinho Kim
              Reporter:
              Hyunsik Choi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development