Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.5.0
    • Component/s: Query Processor
    • Labels:
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Adds the feature of launching multiple map-reduce tasks that are not dependent on each other in parallel. Examples of queries affected would be those including union-alls, and trees of join operators.

      Description

      In a massively parallel database system, it would be awesome to also parallelize some of the mapreduce phases that our data needs to go through.

      One example that just occurred to me is UNION ALL: when you union two SELECT statements, effectively you could run those statements in parallel. There's no situation (that I can think of, but I don't have a formal proof) in which the left statement would rely on the right statement, or vice versa. So, they could be run at the same time...and perhaps they should be. Or, perhaps there should be a way to make this happen...PARALLEL UNION ALL? PUNION ALL?

      1. HIVE549-v7.patch
        199 kB
        Chaitanya Mishra

        Issue Links

          Activity

          Hide
          Ashutosh Chauhan added a comment -

          nevermind there is already HIVE-1033. I will update the patch there soon.

          Show
          Ashutosh Chauhan added a comment - nevermind there is already HIVE-1033 . I will update the patch there soon.
          Hide
          Ashutosh Chauhan added a comment -

          This should be turned on by default. Asim Jalis Would you like to create a jira to change the default value for this config?

          Show
          Ashutosh Chauhan added a comment - This should be turned on by default. Asim Jalis Would you like to create a jira to change the default value for this config?
          Hide
          Asim Jalis added a comment -

          I was curious why you decided to go with false by default?

          Could you summarize the offline discussion?

          If setting it to true is safe then it seems like it would always speed up the query. The only downside I can see is that a query might use up more processor resources on each node.

          Show
          Asim Jalis added a comment - I was curious why you decided to go with false by default? Could you summarize the offline discussion? If setting it to true is safe then it seems like it would always speed up the query. The only downside I can see is that a query might use up more processor resources on each node.
          Hide
          chinna added a comment -

          dd

          Show
          chinna added a comment - dd
          Hide
          Namit Jain added a comment -

          Committed. Thanks Chaitanya

          Show
          Namit Jain added a comment - Committed. Thanks Chaitanya
          Hide
          Chaitanya Mishra added a comment -

          Only difference between this and the previous patch is that hive.exec.parallel is now set to false by default.

          On Namit's suggestion, based on an offline discussion we had.

          Show
          Chaitanya Mishra added a comment - Only difference between this and the previous patch is that hive.exec.parallel is now set to false by default. On Namit's suggestion, based on an offline discussion we had.
          Hide
          Zheng Shao added a comment -

          We cannot use JobControl because not all Hive tasks are map-reduce jobs.

          In some sense, Driver is reimplementing JobControl to accomodate all different types of Hive tasks. The alternative is to extend JobControl.
          I think implementing Driver from scratch (as we already did) is more flexible at this point of time.

          This is good information though. At some point we should revisit Driver and JobControl to see how we can refactor the code better.

          Show
          Zheng Shao added a comment - We cannot use JobControl because not all Hive tasks are map-reduce jobs. In some sense, Driver is reimplementing JobControl to accomodate all different types of Hive tasks. The alternative is to extend JobControl. I think implementing Driver from scratch (as we already did) is more flexible at this point of time. This is good information though. At some point we should revisit Driver and JobControl to see how we can refactor the code better.
          Hide
          Raghotham Murthy added a comment -

          Sorry about jumping into this late. But have you considered using hadoop's JobControl? http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/jobcontrol/JobControl.html I believe PIG uses it as well.

          Show
          Raghotham Murthy added a comment - Sorry about jumping into this late. But have you considered using hadoop's JobControl? http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/jobcontrol/JobControl.html I believe PIG uses it as well.
          Hide
          Namit Jain added a comment -

          I will take a look

          Show
          Namit Jain added a comment - I will take a look
          Hide
          Chaitanya Mishra added a comment -

          Attached patch fixes the three broken unit tests, and passed all unit tests on my computer.

          Show
          Chaitanya Mishra added a comment - Attached patch fixes the three broken unit tests, and passed all unit tests on my computer.
          Hide
          Namit Jain added a comment -

          Yes, can you do the following:

          input41.q:

          insert into a temporary table and then select from that order with a order by'

          input42.q/input_part9.q: just add a order by

          You can do something similar for input* tests - just remember that there are some special optimizations for map-only union queries which we are testing here.
          We want to keep those tests - so instead of adding a order by at the end of union, just insert into a temporary table and then select from that table order by.

          Show
          Namit Jain added a comment - Yes, can you do the following: input41.q: insert into a temporary table and then select from that order with a order by' input42.q/input_part9.q: just add a order by You can do something similar for input* tests - just remember that there are some special optimizations for map-only union queries which we are testing here. We want to keep those tests - so instead of adding a order by at the end of union, just insert into a temporary table and then select from that table order by.
          Hide
          Chaitanya Mishra added a comment -

          The best suggestion I have is to add an order by clause to these tests.

          Show
          Chaitanya Mishra added a comment - The best suggestion I have is to add an order by clause to these tests.
          Hide
          Namit Jain added a comment -

          I agree there may be some non-deterministic tests, but we should make them deterministic instead of deleting them.
          I will go over the tests above and get back to you

          Show
          Namit Jain added a comment - I agree there may be some non-deterministic tests, but we should make them deterministic instead of deleting them. I will go over the tests above and get back to you
          Hide
          Chaitanya Mishra added a comment -

          The patch fails the following unit tests

          TestCliDriver: input41.q, input42.q and input_part9.q

          For input41.q the query involves a union-all, and the failure is because the threads can execute either part of the union-all data as they see fit.

          Other similar queries are: input25.q, input26.q, nullgroup5.q ,semijoin.q and union_script.q. We need to rewrite these test cases.

          For input42.q, and input_part9.q the problem is that the base table has 2 partitions, and Hive can technically read the partitions in any order it sees fit.

          In fact I checked out the latest version of Hive, and ran the unit test for input_part9.q, and it failed, because the data was generated in the opposite order. I think these two tests should be deprecated.

          Show
          Chaitanya Mishra added a comment - The patch fails the following unit tests TestCliDriver: input41.q, input42.q and input_part9.q For input41.q the query involves a union-all, and the failure is because the threads can execute either part of the union-all data as they see fit. Other similar queries are: input25.q, input26.q, nullgroup5.q ,semijoin.q and union_script.q. We need to rewrite these test cases. For input42.q, and input_part9.q the problem is that the base table has 2 partitions, and Hive can technically read the partitions in any order it sees fit. In fact I checked out the latest version of Hive, and ran the unit test for input_part9.q, and it failed, because the data was generated in the opposite order. I think these two tests should be deprecated.
          Hide
          Chaitanya Mishra added a comment -

          This patch differs from the prev patch in the following ways:

          (a) mapredWork is not stored as a threadlocal variable. Instead we maintain a map from jobname -> mapredwork. This was essential since Hive can launch tasks in localjobrunner mode. This is pretty much like Zheng's original suggestion.

          (b) There is additional code to ensure that a job always has a randomly generated name, to ensure that the code doesn't break.

          (c) There is also code to ensure that the distributed cache has a unique handle for plan information. Originally it was always stored as HIVE_PLAN

          (d) Sessionstate was a threadlocal variable. Therefore new code to initlialize sessionstate for new threads has been put in.

          (e) Only map-reduce tasks are launched using new threads. Non map-reduce tasks are launched within the same driver thread. This is to ensure that simple tasks like describe function don't pay the cost of threadlaunching + sleeping and polling for threads.

          (f) At most maxthreads=8 threads are launched.

          Show
          Chaitanya Mishra added a comment - This patch differs from the prev patch in the following ways: (a) mapredWork is not stored as a threadlocal variable. Instead we maintain a map from jobname -> mapredwork. This was essential since Hive can launch tasks in localjobrunner mode. This is pretty much like Zheng's original suggestion. (b) There is additional code to ensure that a job always has a randomly generated name, to ensure that the code doesn't break. (c) There is also code to ensure that the distributed cache has a unique handle for plan information. Originally it was always stored as HIVE_PLAN (d) Sessionstate was a threadlocal variable. Therefore new code to initlialize sessionstate for new threads has been put in. (e) Only map-reduce tasks are launched using new threads. Non map-reduce tasks are launched within the same driver thread. This is to ensure that simple tasks like describe function don't pay the cost of threadlaunching + sleeping and polling for threads. (f) At most maxthreads=8 threads are launched.
          Hide
          Chaitanya Mishra added a comment -

          New patch: significantly different from the previous one. Doesn't pass all unit tests, we might need to rewrite some. Will elaborate on this theme in the next comments.

          Show
          Chaitanya Mishra added a comment - New patch: significantly different from the previous one. Doesn't pass all unit tests, we might need to rewrite some. Will elaborate on this theme in the next comments.
          Hide
          Namit Jain added a comment -

          Sure, I will commit it once you make these changes

          Show
          Namit Jain added a comment - Sure, I will commit it once you make these changes
          Hide
          Zheng Shao added a comment -

          Hi Chaitanya, the code looks good to me except a small nitch: can you change the logic of "shouldLaunch()"? Currently it means "shouldNotLaunch()".

          After you have done that, please test again with "ant test -logfile log.txt" and make sure all tests are passing.

          Then we can commit it.

          Show
          Zheng Shao added a comment - Hi Chaitanya, the code looks good to me except a small nitch: can you change the logic of "shouldLaunch()"? Currently it means "shouldNotLaunch()". After you have done that, please test again with "ant test -logfile log.txt" and make sure all tests are passing. Then we can commit it.
          Hide
          Chaitanya Mishra added a comment -

          No comments for about a week? If this looks fine, do I formally submit this as a patch?

          Show
          Chaitanya Mishra added a comment - No comments for about a week? If this looks fine, do I formally submit this as a patch?
          Hide
          Chaitanya Mishra added a comment -

          Thanks for the comments.

          • Renamed variable to hive.exec.parallel
          • Renamed function to shouldLaunch
          • Added javadocs for a few functions
          • The task information remains of the form (Stage-x) as before.
          • However, for each new Map-reduce job launched, I print a sentence of the form "Launching job 3 out of 5". Note that this is only for map-reduce jobs.

          I don't have a good answer for the exception question, since I don't quite know what Hive did before.

          Comments?

          Show
          Chaitanya Mishra added a comment - Thanks for the comments. Renamed variable to hive.exec.parallel Renamed function to shouldLaunch Added javadocs for a few functions The task information remains of the form (Stage-x) as before. However, for each new Map-reduce job launched, I print a sentence of the form "Launching job 3 out of 5". Note that this is only for map-reduce jobs. I don't have a good answer for the exception question, since I don't quite know what Hive did before. Comments?
          Hide
          Ning Zhang added a comment -

          Just curious, how do we handle exceptions if tasks are executed in parallel. I am asking this because we probably need to setup a global configuration at any point of the query execution (e.g., change the replication factor of a table in the map-side join), and change it back when the whole query is finished or an exception is caught at the end.

          It seems to me that the parallel execution paradigm can be applied only if there is a child task waiting on all the parallel tasks at any point. The child is responsible for rolling back if any exception is caught. An example I am thinking that cannot use the parallel paradigm is the multi-insert case, where each insert is a branch and they don't "meet" at the bottom of the plan.

          Show
          Ning Zhang added a comment - Just curious, how do we handle exceptions if tasks are executed in parallel. I am asking this because we probably need to setup a global configuration at any point of the query execution (e.g., change the replication factor of a table in the map-side join), and change it back when the whole query is finished or an exception is caught at the end. It seems to me that the parallel execution paradigm can be applied only if there is a child task waiting on all the parallel tasks at any point. The child is responsible for rolling back if any exception is caught. An example I am thinking that cannot use the parallel paradigm is the multi-insert case, where each insert is a branch and they don't "meet" at the bottom of the plan.
          Hide
          Namit Jain added a comment -

          I agree - this can be done in a follow-up.
          Do you want to enable this feature by default, and disable it in specific hive-site.xml if need be ?

          Show
          Namit Jain added a comment - I agree - this can be done in a follow-up. Do you want to enable this feature by default, and disable it in specific hive-site.xml if need be ?
          Hide
          Zheng Shao added a comment -

          We have a task called ConditionalTask which may or may not submit a mapreduce task at runtime. It's true that map-reduce jobs usually takes longer time, but given ConditionalTask it seems a better idea to treat all tasks the same when reporting progress.

          I agree "1/7, 4/7 and 2/7" on the job tracker may seems bad, but user should see on the command line all 7 tasks starting and finishing.
          We can print out task starting/finishing information on the command line for all tasks.
          For example, every time a task starts/finishes, we can print out "Stage-3 started. Total: 7, Pending: 3, Running: 2, Finished: 2", or "Stage-3 finished. Total: 7, Pending: 3, Running: 2, Finished: 2"

          What do you think?

          By the way, I am OK with finishing this issue first, and then do better progress information in a separate transaction if you want.

          Show
          Zheng Shao added a comment - We have a task called ConditionalTask which may or may not submit a mapreduce task at runtime. It's true that map-reduce jobs usually takes longer time, but given ConditionalTask it seems a better idea to treat all tasks the same when reporting progress. I agree "1/7, 4/7 and 2/7" on the job tracker may seems bad, but user should see on the command line all 7 tasks starting and finishing. We can print out task starting/finishing information on the command line for all tasks. For example, every time a task starts/finishes, we can print out "Stage-3 started. Total: 7, Pending: 3, Running: 2, Finished: 2", or "Stage-3 finished. Total: 7, Pending: 3, Running: 2, Finished: 2" What do you think? By the way, I am OK with finishing this issue first, and then do better progress information in a separate transaction if you want.
          Hide
          Chaitanya Mishra added a comment -

          Just saw Zhengs comment.

          The problem with TaskID / total tasks is:

          • We might first launch task 1, then task 4, and then task 2. It'll then look like 1/7, 4/7 and 2/7
          • Also, we currently aren't displaying non-MR tasks in the log. So total tasks could be 7, but we display information about only the 3 map-reduce ones.
          Show
          Chaitanya Mishra added a comment - Just saw Zhengs comment. The problem with TaskID / total tasks is: We might first launch task 1, then task 4, and then task 2. It'll then look like 1/7, 4/7 and 2/7 Also, we currently aren't displaying non-MR tasks in the log. So total tasks could be 7, but we display information about only the 3 map-reduce ones.
          Hide
          Chaitanya Mishra added a comment -

          We are still printing total number of MR jobs.
          However we are changing job names to taskid (i.e Stage-x) format instead of 3/5

          I can switch back to 3/5 kind of format which depended on the curjobno variable , but the issue there is that the order in which tasks are launched might change over runs, and therefore the job name can change.

          Let me know which one sounds better.

          Show
          Chaitanya Mishra added a comment - We are still printing total number of MR jobs. However we are changing job names to taskid (i.e Stage-x) format instead of 3/5 I can switch back to 3/5 kind of format which depended on the curjobno variable , but the issue there is that the order in which tasks are launched might change over runs, and therefore the job name can change. Let me know which one sounds better.
          Hide
          Zheng Shao added a comment -

          Agree. We should have "taskId()/totalTasks" in the job name.

          Show
          Zheng Shao added a comment - Agree. We should have "taskId()/totalTasks" in the job name.
          Hide
          Namit Jain added a comment -

          Are we changing the job name to taskId() instead of 3/5 ?
          Also, are we not printing the total number of map-reduce jobs ?

          The above naming convention makes it easier for debugging sometimes.
          So, it might be good to maintain that.

          Show
          Namit Jain added a comment - Are we changing the job name to taskId() instead of 3/5 ? Also, are we not printing the total number of map-reduce jobs ? The above naming convention makes it easier for debugging sometimes. So, it might be good to maintain that.
          Hide
          Zheng Shao added a comment -

          Please use "hive.exec.parallel" instead, to make it consistent with other "hive.exec" parameters.

          Show
          Zheng Shao added a comment - Please use "hive.exec.parallel" instead, to make it consistent with other "hive.exec" parameters.
          Hide
          Zheng Shao added a comment -

          THIVE-549-v3.patch:
          Can you add javadoc comments to all public methods (except getters and setters)? I know we were not following that all the time historically, but we want to enforce that on all new patches.

          It seems to me that "shouldLaunch()" is a better name than "checkLaunch()", but it's up to you whether you would like to change it.

          Can we change the name of "hive.optimize.par" to "hive.execution.parallel"?
          "optimize" was meant for plan optimization. This is an execution time option. Also "par" does not reflect to "parallel" in my mind.

          Show
          Zheng Shao added a comment - THIVE-549-v3.patch: Can you add javadoc comments to all public methods (except getters and setters)? I know we were not following that all the time historically, but we want to enforce that on all new patches. It seems to me that "shouldLaunch()" is a better name than "checkLaunch()", but it's up to you whether you would like to change it. Can we change the name of "hive.optimize.par" to "hive.execution.parallel"? "optimize" was meant for plan optimization. This is an execution time option. Also "par" does not reflect to "parallel" in my mind.
          Hide
          Chaitanya Mishra added a comment -

          There was a race condition in the previous patch of the following form.

          Task 3 is a child of Tasks 1 and 2.
          1 and 2 finish.
          Main thread checks child of 1 (Task 3) if it has started and is runnable. Calls initalize for task 3. Launches new thread for task 3.
          Main thread checks child of 2 (also task 3) if it has started and is runnable. By this point the separate thread for 3 has not actually done any execution. Hence started evaluates to false, and main therefore launches a new instance of Task 3.

          Problem: There are 2 instances of Task 3.

          Solution: Added a new variable initialized to task.java. initialized is set to true as soon as the function initialize() is called. Similarly, launch a task only if it isRunnable() and is not initialized. Since only the main thread invokes tsk.intiialize() and checkLaunch(), there is no race condition here.

          Uploading this patch, and deleting previous two patches.

          Show
          Chaitanya Mishra added a comment - There was a race condition in the previous patch of the following form. Task 3 is a child of Tasks 1 and 2. 1 and 2 finish. Main thread checks child of 1 (Task 3) if it has started and is runnable. Calls initalize for task 3. Launches new thread for task 3. Main thread checks child of 2 (also task 3) if it has started and is runnable. By this point the separate thread for 3 has not actually done any execution. Hence started evaluates to false, and main therefore launches a new instance of Task 3. Problem: There are 2 instances of Task 3. Solution: Added a new variable initialized to task.java. initialized is set to true as soon as the function initialize() is called. Similarly, launch a task only if it isRunnable() and is not initialized. Since only the main thread invokes tsk.intiialize() and checkLaunch(), there is no race condition here. Uploading this patch, and deleting previous two patches.
          Hide
          Chaitanya Mishra added a comment -

          For some reason Apache saw the previous patch as a binary file.

          Show
          Chaitanya Mishra added a comment - For some reason Apache saw the previous patch as a binary file.
          Hide
          Chaitanya Mishra added a comment -

          New patch, reflecting Zhengs comments, and an offline discussion we had.

          • gWorkContainer stays as it is, a ThreadLocal variable. A better solution would probably be creating a map from job to mapredWork, but we are deferring it for now.
          • job-ids now correspond to Task-ids. Therefore, they will remain same across runs.
          • The Task-id is displayed in the progress information, since multiple jobs corresponding to the same query might be running at the same time.
          • No copy of "conf" is needed since it is used only in the initialize function, to set the jobconf and mapredWork. Only one task is being initialized at a time.
          • Options: Added a new confvar hive.optimize.par with default value true. If set to false, tasks are launched sequentially in the same thread.
          • Cleanup/Failure: The taskCleanup function now simply calls System.exit(9). All tasks executing within the process are killed. The map-reduce processes are killed through the runningJonKillURIs hashmap, which is set up as a Shutdownhook in ExecDriver.java. Access to this hashmap is now controlled through a synchronized interface, since multiple threads might be launching at the same time.

          Thanks.

          Show
          Chaitanya Mishra added a comment - New patch, reflecting Zhengs comments, and an offline discussion we had. gWorkContainer stays as it is, a ThreadLocal variable. A better solution would probably be creating a map from job to mapredWork, but we are deferring it for now. job-ids now correspond to Task-ids. Therefore, they will remain same across runs. The Task-id is displayed in the progress information, since multiple jobs corresponding to the same query might be running at the same time. No copy of "conf" is needed since it is used only in the initialize function, to set the jobconf and mapredWork. Only one task is being initialized at a time. Options: Added a new confvar hive.optimize.par with default value true. If set to false, tasks are launched sequentially in the same thread. Cleanup/Failure: The taskCleanup function now simply calls System.exit(9). All tasks executing within the process are killed. The map-reduce processes are killed through the runningJonKillURIs hashmap, which is set up as a Shutdownhook in ExecDriver.java. Access to this hashmap is now controlled through a synchronized interface, since multiple threads might be launching at the same time. Thanks.
          Hide
          Chaitanya Mishra added a comment -

          My session timed out in my previous try, so I'm uploading the patch now, and will add comments next.

          Show
          Chaitanya Mishra added a comment - My session timed out in my previous try, so I'm uploading the patch now, and will add comments next.
          Hide
          Zheng Shao added a comment -

          Hive-549.patch: Overall the structure of the changes look good to me.

          Utilities.java:

          • gWorkContainer: this variable needs to be heavily commented - why do we need it, why it's ThreadLocal, any alternatives (and optionally, why this is better than alternatives).
          • Where do we serialize data to "HIVE_PLAN"? Is that thread-safe?
            InputStream in = new FileInputStream("HIVE_PLAN");

          HiveInputFormat.java:

          • Unnecessary white-space changes only.

          Driver.java:

          • It's a good idea to assign the job numbers statically - if the jobNo of the same job can be different in different runs, it will be harder to debug.
          • Shall we immediately stop all other running jobs if one of them have failed?
            + console.printError(errorMessage);
            + taskCleanup(runnable);
          • I guess you mean "if the child has already started, or is NOT runnable"
            + // Check if the child has already started, or is runnable
            + if(checkLaunch(child)) {
          • There are 2 places where we do "curJobNo++" in this function:
            + public int launchTask(Task<? extends Serializable> tsk, String queryId,
          • Do we need to get a copy of "conf" object since we are modifying it in launchTask?
            + tsk.initialize(conf, plan);

          Can we add an option for the user to choose "sequential" or "parallel" execution? The change could be simple - we just need to check the option in launchTask to decide whether we should call TaskRunner.start() or TaskRunner.run(). Please add the new option to HiveConf class, and conf/hive-default.xml.

          Show
          Zheng Shao added a comment - Hive-549.patch: Overall the structure of the changes look good to me. Utilities.java: gWorkContainer: this variable needs to be heavily commented - why do we need it, why it's ThreadLocal, any alternatives (and optionally, why this is better than alternatives). Where do we serialize data to "HIVE_PLAN"? Is that thread-safe? InputStream in = new FileInputStream("HIVE_PLAN"); HiveInputFormat.java: Unnecessary white-space changes only. Driver.java: It's a good idea to assign the job numbers statically - if the jobNo of the same job can be different in different runs, it will be harder to debug. Shall we immediately stop all other running jobs if one of them have failed? + console.printError(errorMessage); + taskCleanup(runnable); I guess you mean "if the child has already started, or is NOT runnable" + // Check if the child has already started, or is runnable + if(checkLaunch(child)) { There are 2 places where we do "curJobNo++" in this function: + public int launchTask(Task<? extends Serializable> tsk, String queryId, Do we need to get a copy of "conf" object since we are modifying it in launchTask? + tsk.initialize(conf, plan); Can we add an option for the user to choose "sequential" or "parallel" execution? The change could be simple - we just need to check the option in launchTask to decide whether we should call TaskRunner.start() or TaskRunner.run(). Please add the new option to HiveConf class, and conf/hive-default.xml.
          Hide
          Chaitanya Mishra added a comment -

          Attaching a patch for this. I hope its the right format.

          Summary of changes:

          • Created TaskRunner.java, which launches new tasks as threads.
          • Created TaskResult.java, which encapsulates the return value of the thread.
          • Modified execute() function of ql/Driver.java to launch tasks as soon as they are runnable.
          • Also, modified the Utilities.gWork variable to be ThreadLocal, so that the state of multiple threads is kept independently.

          The end result of this patch is that a task (which is a part of a query plan is launched as soon as it is runnable, instead of waiting in a queue.

          Comments?

          Show
          Chaitanya Mishra added a comment - Attaching a patch for this. I hope its the right format. Summary of changes: Created TaskRunner.java, which launches new tasks as threads. Created TaskResult.java, which encapsulates the return value of the thread. Modified execute() function of ql/Driver.java to launch tasks as soon as they are runnable. Also, modified the Utilities.gWork variable to be ThreadLocal, so that the state of multiple threads is kept independently. The end result of this patch is that a task (which is a part of a query plan is launched as soon as it is runnable, instead of waiting in a queue. Comments?
          Hide
          He Yongqiang added a comment -

          Thanks for the clarifying. Your logic looks good. I did not know there is already one isRunnable interface, and the reason that i am adding the notification stuff is to avoid the lots of check work needs to do, but obviously it does not work well.

          Show
          He Yongqiang added a comment - Thanks for the clarifying. Your logic looks good. I did not know there is already one isRunnable interface, and the reason that i am adding the notification stuff is to avoid the lots of check work needs to do, but obviously it does not work well.
          Hide
          Chaitanya Mishra added a comment -

          Typo in previous outline:

          If task finishes correctly

          • Check its child tasks if isRunnable() is satisfied. If so, launch a TaskRunner for each child.
          Show
          Chaitanya Mishra added a comment - Typo in previous outline: If task finishes correctly Check its child tasks if isRunnable() is satisfied. If so, launch a TaskRunner for each child.
          Hide
          Chaitanya Mishra added a comment -

          Thanks for your comments He.

          Just to clarify: The way I'm thinking about it is: a TaskRunner will be launched by the execute function in ql/driver.java only when it satisfies the isRunnable() check which is currently implemented. Therefore, the workflow will be like.

          Task

          • Scan through list of tasks. Launch taskrunners for each task that satisfies the isRunnable call.
          • Wait for any task to finish or fail.

          If task finishes correctly

          • Check its child tasks if isRunnable() is satisfied. If so, launch TaskRunners for the taskrunners.

          If task fails

          • Kill all existing taskrunners, making sure to cleanup.

          TaskRunner (extends thread)

          • Takes a task, executes it in a separate thread, and returns either 0 or 1, depending on success or failure.

          The difference from your design (as I understand it) is that a TaskRunner will be launched only when a task is runnable. All the notifications 2.1, 2.2 and 2.3 aren't necessary then. The existing logic in the execute() function should handle this.

          Show
          Chaitanya Mishra added a comment - Thanks for your comments He. Just to clarify: The way I'm thinking about it is: a TaskRunner will be launched by the execute function in ql/driver.java only when it satisfies the isRunnable() check which is currently implemented. Therefore, the workflow will be like. Task Scan through list of tasks. Launch taskrunners for each task that satisfies the isRunnable call. Wait for any task to finish or fail. If task finishes correctly Check its child tasks if isRunnable() is satisfied. If so, launch TaskRunners for the taskrunners. If task fails Kill all existing taskrunners, making sure to cleanup. TaskRunner (extends thread) Takes a task, executes it in a separate thread, and returns either 0 or 1, depending on success or failure. The difference from your design (as I understand it) is that a TaskRunner will be launched only when a task is runnable. All the notifications 2.1, 2.2 and 2.3 aren't necessary then. The existing logic in the execute() function should handle this.
          Hide
          He Yongqiang added a comment -

          In 3) besides notifying its children, the task runner also needs to mark itself of its state.

          Show
          He Yongqiang added a comment - In 3) besides notifying its children, the task runner also needs to mark itself of its state.
          Hide
          He Yongqiang added a comment -

          Agree on adding a new TaskRunner which extends Thread.
          I think we can define several states for each TaskRunner.
          An simple example can be:
          1. iterate the task queue and for each task lunch a new TaskRunner for it
          2. For a TaskRunner serving for a special task, first check the states of all of his parents
          2.1 If any of his parents got dead(Failed), Fail itself
          2.2 if all parents are successful, lunch itself and mark itself as running
          2.3 if any of its parents are not running or are not finished/failed, mark itself as pending. And also add itself in a waiting queue of its not finished parent.
          3. If a task successfully finished or failed, notify all children in its waiting queue of whethe it successed or failed.
          Obviously, this simple algorithm has one problem: each task got one dedicated thread allocated for it. It will be better if we can compress thread numbers.

          Show
          He Yongqiang added a comment - Agree on adding a new TaskRunner which extends Thread. I think we can define several states for each TaskRunner. An simple example can be: 1. iterate the task queue and for each task lunch a new TaskRunner for it 2. For a TaskRunner serving for a special task, first check the states of all of his parents 2.1 If any of his parents got dead(Failed), Fail itself 2.2 if all parents are successful, lunch itself and mark itself as running 2.3 if any of its parents are not running or are not finished/failed, mark itself as pending. And also add itself in a waiting queue of its not finished parent. 3. If a task successfully finished or failed, notify all children in its waiting queue of whethe it successed or failed. Obviously, this simple algorithm has one problem: each task got one dedicated thread allocated for it. It will be better if we can compress thread numbers.
          Hide
          Chaitanya Mishra added a comment -

          Just to clarify the previous comment: TaskRunner will call Task, which then proceeds as before.

          The execute() function in ql/Driver.java will now wait for tasks to finish, and launch child tasks (through a taskrunner) as soon as they become runnable.

          Show
          Chaitanya Mishra added a comment - Just to clarify the previous comment: TaskRunner will call Task, which then proceeds as before. The execute() function in ql/Driver.java will now wait for tasks to finish, and launch child tasks (through a taskrunner) as soon as they become runnable.
          Hide
          Chaitanya Mishra added a comment -

          Zheng and I had a email-discussion on this. To launch multiple tasks from the same driver, we'll need to launch each task as a separate thread.

          The simplest solution is to extend Task to implement the Runnable Interface, but this might affect other components... Not completely sure about this since an element that implements runnable can still be called in a sequential fashion.

          Zheng instead proposes adding a new class TaskRunner, with functions:

          run(): asynchronous call
          start(),
          wait(long timeoutMilli),
          and stop()

          Most likely, if we get taskrunner to extend Thread, we get these functions; if not we can implement runnable.

          So, do we get Task to implement runnable, or add a layer of indirection as TaskRunner?

          Show
          Chaitanya Mishra added a comment - Zheng and I had a email-discussion on this. To launch multiple tasks from the same driver, we'll need to launch each task as a separate thread. The simplest solution is to extend Task to implement the Runnable Interface, but this might affect other components... Not completely sure about this since an element that implements runnable can still be called in a sequential fashion. Zheng instead proposes adding a new class TaskRunner, with functions: run(): asynchronous call start(), wait(long timeoutMilli), and stop() Most likely, if we get taskrunner to extend Thread, we get these functions; if not we can implement runnable. So, do we get Task to implement runnable, or add a layer of indirection as TaskRunner?
          Hide
          JArod Wen added a comment -

          Intuitively in the query DAG the sub-branches of a given node can be paralleled. Since the DAG can be generated in several ways, there must be some of them which may be much more efficient than others considering the possible parallel tasks, right? So some changes in the optimization when compiling will be cool. Anyway, I agree with Namit about the first step on the scheduler.

          Show
          JArod Wen added a comment - Intuitively in the query DAG the sub-branches of a given node can be paralleled. Since the DAG can be generated in several ways, there must be some of them which may be much more efficient than others considering the possible parallel tasks, right? So some changes in the optimization when compiling will be cool. Anyway, I agree with Namit about the first step on the scheduler.
          Hide
          Namit Jain added a comment -

          done

          Show
          Namit Jain added a comment - done
          Hide
          He Yongqiang added a comment -

          Shoud we edit this issue to "parallel execution mechanism"? or we can open a new jira and link this to that.

          Show
          He Yongqiang added a comment - Shoud we edit this issue to "parallel execution mechanism"? or we can open a new jira and link this to that.
          Hide
          Namit Jain added a comment -

          I agree completely - What I was trying to get at is
          1. It is not specific to union all.
          2. It does not need a change in compiler - just the scheduler change should be good enough.

          Show
          Namit Jain added a comment - I agree completely - What I was trying to get at is 1. It is not specific to union all. 2. It does not need a change in compiler - just the scheduler change should be good enough.
          Hide
          He Yongqiang added a comment -

          Since the generated tasks plan/task stages is a DAG, the user can get results more quickly if we submitting tasks in parallel (two jobs can be submmitted in parallel if they/their parents does not rely on each other)? We can let hadoop scheduler how to run the jobs for better cluster usage etc.

          Show
          He Yongqiang added a comment - Since the generated tasks plan/task stages is a DAG, the user can get results more quickly if we submitting tasks in parallel (two jobs can be submmitted in parallel if they/their parents does not rely on each other)? We can let hadoop scheduler how to run the jobs for better cluster usage etc.
          Hide
          Namit Jain added a comment -

          This is not specific to UNION ALL - Hive maintains the complete task dependency tree and can execute tasks in parallel.
          It will help latency substantially. Should not help the overall cluster usage, but will be specially useful for benchmarks.

          Cant think of any reason of any parallelizing. I dont see any reason to change the plan - while walking the task tree,
          execute all tasks whose dependencies have been executed.

          Show
          Namit Jain added a comment - This is not specific to UNION ALL - Hive maintains the complete task dependency tree and can execute tasks in parallel. It will help latency substantially. Should not help the overall cluster usage, but will be specially useful for benchmarks. Cant think of any reason of any parallelizing. I dont see any reason to change the plan - while walking the task tree, execute all tasks whose dependencies have been executed.
          Hide
          He Yongqiang added a comment -

          Will hive consider replace its queue exection with a parall execution?

          Show
          He Yongqiang added a comment - Will hive consider replace its queue exection with a parall execution?

            People

            • Assignee:
              Chaitanya Mishra
              Reporter:
              Adam Kramer
            • Votes:
              0 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development