Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      Once a mapper in the MR index job succeeds, it should not need to be re-done in the event of the failure of one of the other mappers. The initial population of an index is based on a snapshot in time, so new rows getting after the index build has started and/or failed do not impact it.

      Also, there's a 1:1 correspondence between index rows and table rows, so there's really no need to dedup. However, the index rows will have a different row key than the data table, so I'm not sure how the HFiles are split. Will they potentially overlap and is this an issue?

      1. IndexTool.java
        13 kB
        James Taylor
      2. PHOENIX-2154-_HBase_Frontdoor_API_v1.patch
        35 kB
        Ravi Kishore Valeti
      3. PHOENIX-2154-_HBase_Frontdoor_API_v2.patch
        40 kB
        Ravi Kishore Valeti
      4. PHOENIX-2154-_HBase_Frontdoor_API_WIP.patch
        26 kB
        Ravi Kishore Valeti
      5. PHOENIX-2154-WIP.patch
        20 kB
        ravi

        Activity

        Hide
        jamestaylor James Taylor added a comment - - edited

        Comment from ravi:
        Right now, the default reducer we use is KeyValueSortReducer . This aggregates the mapper output and write as HFiles into a directory that the user has passed as an argument to the job. After the map reduce job is complete, the LoadIncrementalHFiles class does the task to move the HFiles onto the HBase table.

        Couple of possibilities that can happen in the above workflow
        a) If all mappers succeed:
        A happy path. HFiles get loaded into the HTable and we then update the state to ACTIVE.

        b) If few mappers fail:
        The job is marked as failure. There can be instances that few mappers have succeeded but since few(one or many) have failed, the job is marked as a Failure. In this case, the output that was produced by the successful mappers is discarded as part of the cleanup process in MapReduce. In this case, the entire job has to be re-run.

        It'd be good to make the mapper task be a kind of independent chunk of work, which once complete, would not need to be run again in case the job is started again.
        Are you referring to the second case above? If so, I am not sure of an easy way to have the output of mappers persist beyond the job. We can address this by pushing the output KeyValue pairs onto a persistent store like HDFS rather than writing to local filesystem directories. However, I see a challenge when the job is run again. The case here is we have few records(in no sequence order in the primary table) already processed and written to a persistent store and if we do not want them to be processed again in the mapper, there should be some flag or signal to avoid reading it. Mappers usually have region boundaries, if when the job is run again, if we have the same region boundaries, then we could definitely have a way to hack into the framework by saying to run the mappers on only those regions where the earlier mappers failed.

        Show
        jamestaylor James Taylor added a comment - - edited Comment from ravi : Right now, the default reducer we use is KeyValueSortReducer . This aggregates the mapper output and write as HFiles into a directory that the user has passed as an argument to the job. After the map reduce job is complete, the LoadIncrementalHFiles class does the task to move the HFiles onto the HBase table. Couple of possibilities that can happen in the above workflow a) If all mappers succeed: A happy path. HFiles get loaded into the HTable and we then update the state to ACTIVE. b) If few mappers fail: The job is marked as failure. There can be instances that few mappers have succeeded but since few(one or many) have failed, the job is marked as a Failure. In this case, the output that was produced by the successful mappers is discarded as part of the cleanup process in MapReduce. In this case, the entire job has to be re-run. It'd be good to make the mapper task be a kind of independent chunk of work, which once complete, would not need to be run again in case the job is started again. Are you referring to the second case above? If so, I am not sure of an easy way to have the output of mappers persist beyond the job. We can address this by pushing the output KeyValue pairs onto a persistent store like HDFS rather than writing to local filesystem directories. However, I see a challenge when the job is run again. The case here is we have few records(in no sequence order in the primary table) already processed and written to a persistent store and if we do not want them to be processed again in the mapper, there should be some flag or signal to avoid reading it. Mappers usually have region boundaries, if when the job is run again, if we have the same region boundaries, then we could definitely have a way to hack into the framework by saying to run the mappers on only those regions where the earlier mappers failed.
        Hide
        jamestaylor James Taylor added a comment -

        The main thing to keep in mind is that it's difficult to guarantee that the client that kicked off the job will be around long enough to complete it. Same goes for all the mappers. For a multi-billion row table, it'll take maybe 10-12 hours. If, on the other hand, the incremental work can be kept, that would be ideal.

        If need be, we may want to use the regular HBase APIs (rather than building HFiles), and keep track of which data row key we're at for each mapper. Then, if something goes wrong, the Mapper can start up where it left off. Even if this is slower overall, it's still a win, as we know the job will complete.

        Would be interested in hearing what you think too, Jan Fernando, Eli Levine, ravi, Ravi Kishore Valeti, Thomas D'Silva

        Show
        jamestaylor James Taylor added a comment - The main thing to keep in mind is that it's difficult to guarantee that the client that kicked off the job will be around long enough to complete it. Same goes for all the mappers. For a multi-billion row table, it'll take maybe 10-12 hours. If, on the other hand, the incremental work can be kept, that would be ideal. If need be, we may want to use the regular HBase APIs (rather than building HFiles), and keep track of which data row key we're at for each mapper. Then, if something goes wrong, the Mapper can start up where it left off. Even if this is slower overall, it's still a win, as we know the job will complete. Would be interested in hearing what you think too, Jan Fernando , Eli Levine , ravi , Ravi Kishore Valeti , Thomas D'Silva
        Hide
        jamestaylor James Taylor added a comment - - edited

        Would be interested in hearing what you think too, Jan Fernando, Eli Levine, ravi, Ravi Kishore Valeti, Thomas D'Silva.

        Show
        jamestaylor James Taylor added a comment - - edited Would be interested in hearing what you think too, Jan Fernando , Eli Levine , ravi , Ravi Kishore Valeti , Thomas D'Silva .
        Hide
        maghamravikiran ravi added a comment -

        Right now, the job does both the tasks for generating the HFiles and then loading them onto the target table. Should we try to break it into two process where
        1) The job runs the HFiles generation code. We run it with a submit() rather than waitForCompletion(). This way, the client returns immediately
        2) Once the job finishes, the client runs the org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles job to load the HFiles onto the table.

        Currently, the mapper output runs through a KeyValueSortReducer , a Reducer class that is responsible to write the output in HFile format. To keep the state across jobs(when failures happen), we will have to write the map output to HDFS and then run a sub sequent job that loads the previous map output and write to HFiles through the KeyValueSortReducer. Not sure if we wanted to travel this path.

        Show
        maghamravikiran ravi added a comment - Right now, the job does both the tasks for generating the HFiles and then loading them onto the target table. Should we try to break it into two process where 1) The job runs the HFiles generation code. We run it with a submit() rather than waitForCompletion(). This way, the client returns immediately 2) Once the job finishes, the client runs the org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles job to load the HFiles onto the table. Currently, the mapper output runs through a KeyValueSortReducer , a Reducer class that is responsible to write the output in HFile format. To keep the state across jobs(when failures happen), we will have to write the map output to HDFS and then run a sub sequent job that loads the previous map output and write to HFiles through the KeyValueSortReducer. Not sure if we wanted to travel this path.
        Hide
        elilevine Eli Levine added a comment -

        At a high level I would go for resiliency over speed for async index builds. The probability of a single mapper failure increases with data size. Background index builds are meant for large data sizes, so it seems natural to prioritize incremental restart over speed IMHO.

        Show
        elilevine Eli Levine added a comment - At a high level I would go for resiliency over speed for async index builds. The probability of a single mapper failure increases with data size. Background index builds are meant for large data sizes, so it seems natural to prioritize incremental restart over speed IMHO.
        Hide
        jfernando_sfdc Jan Fernando added a comment -

        +1 to Eli Levine's comments. Resiliency over speed is the right trade off IMHO.

        Show
        jfernando_sfdc Jan Fernando added a comment - +1 to Eli Levine 's comments. Resiliency over speed is the right trade off IMHO.
        Hide
        jamestaylor James Taylor added a comment -

        Turns out it's pretty easy to switch a MR job to use the front-door APIs instead of loading HFiles. See attachment of new version of IndexTool on how to do that (thanks to Lars Hofhansl). Depending on what we figure out, we may want to make this configurable between loading HFiles and using regular HBase APIs. I good first step would be to perf test this new mapper-only solution and compare with the HFile building one. Thomas D'Silva - would you mind attaching the perf numbers that you've collected so far?

        Once nice thing about using the front-door APIs is that it'd be pretty easy to track where we are with the index build and start off again where we left off (using the previous known successful row key as the start row of the scan). One thing we still need to figure out is how to mark the index as active once all mappers have completely (hopefully without having to hold the client open the entire time). Is there some kind of callback mechanism we can rely on?

        IMHO, we need to ensure we’re meeting our design goal of making the index build resilient. It must be incremental and restartable (picking up more or less where it left off without requiring the client that started it to be up for the entire build), and it must be monitorable.

        ravi, Ravi Kishore Valeti.

        Show
        jamestaylor James Taylor added a comment - Turns out it's pretty easy to switch a MR job to use the front-door APIs instead of loading HFiles. See attachment of new version of IndexTool on how to do that (thanks to Lars Hofhansl ). Depending on what we figure out, we may want to make this configurable between loading HFiles and using regular HBase APIs. I good first step would be to perf test this new mapper-only solution and compare with the HFile building one. Thomas D'Silva - would you mind attaching the perf numbers that you've collected so far? Once nice thing about using the front-door APIs is that it'd be pretty easy to track where we are with the index build and start off again where we left off (using the previous known successful row key as the start row of the scan). One thing we still need to figure out is how to mark the index as active once all mappers have completely (hopefully without having to hold the client open the entire time). Is there some kind of callback mechanism we can rely on? IMHO, we need to ensure we’re meeting our design goal of making the index build resilient. It must be incremental and restartable (picking up more or less where it left off without requiring the client that started it to be up for the entire build), and it must be monitorable. ravi , Ravi Kishore Valeti .
        Hide
        gabriel.reid Gabriel Reid added a comment -

        One thing to keep in mind about the trace-off between writing HFiles vs writing directly to HBase is the flush and compaction overhead that can be come from such high-throughput writing. In the case of writing index entries there might not be too big of a problem with this, but I've definitely seen large flush and compaction queues get generated due to writing lots of heavy-weight rows via a MR job (with the resolution to this issue always being writing HFiles).

        Show
        gabriel.reid Gabriel Reid added a comment - One thing to keep in mind about the trace-off between writing HFiles vs writing directly to HBase is the flush and compaction overhead that can be come from such high-throughput writing. In the case of writing index entries there might not be too big of a problem with this, but I've definitely seen large flush and compaction queues get generated due to writing lots of heavy-weight rows via a MR job (with the resolution to this issue always being writing HFiles).
        Hide
        jamestaylor James Taylor added a comment -

        Good point, Gabriel Reid. What do you think the trade-off would be between 1) the work the reducer needs to do to make the HFiles non overlapping (as there's no correlation between the index row keys and the data table row keys) versus 2) the extra flush/compactions that'll be necessary if we go through the front-door APIs? For (1), with a new index we don't know the split points, so it'll be a single reducer unfortunately. Is it possible in MR to set the split points after the map phase, but before the reduce phase? If not possible in MR can this be done in Spark?

        Also, Gabriel Reid - do you know if there's a mechanism to get some kind of callback once all the mappers are doing? Or is the only choice to have the client that submitted the MR job wait until it's complete?

        Show
        jamestaylor James Taylor added a comment - Good point, Gabriel Reid . What do you think the trade-off would be between 1) the work the reducer needs to do to make the HFiles non overlapping (as there's no correlation between the index row keys and the data table row keys) versus 2) the extra flush/compactions that'll be necessary if we go through the front-door APIs? For (1), with a new index we don't know the split points, so it'll be a single reducer unfortunately. Is it possible in MR to set the split points after the map phase, but before the reduce phase? If not possible in MR can this be done in Spark? Also, Gabriel Reid - do you know if there's a mechanism to get some kind of callback once all the mappers are doing? Or is the only choice to have the client that submitted the MR job wait until it's complete?
        Hide
        jamestaylor James Taylor added a comment -

        One simple way that Samarth Jain come up with of marking the index as active once all the mappers are complete is to do this in the reduce phase. Since we don't need a reducer at all when we're using the regular HBase APIs, we can configure our MR index builder job to have a single reducer that simply marks the index as active.

        Does that make sense, ravi, Gabriel Reid, Thomas D'Silva?

        Show
        jamestaylor James Taylor added a comment - One simple way that Samarth Jain come up with of marking the index as active once all the mappers are complete is to do this in the reduce phase. Since we don't need a reducer at all when we're using the regular HBase APIs, we can configure our MR index builder job to have a single reducer that simply marks the index as active. Does that make sense, ravi , Gabriel Reid , Thomas D'Silva ?
        Hide
        tdsilva Thomas D'Silva added a comment -

        +1 setting the status to active in a single reducer makes sense to me.

        Show
        tdsilva Thomas D'Silva added a comment - +1 setting the status to active in a single reducer makes sense to me.
        Hide
        tdsilva Thomas D'Silva added a comment -

        perf results for a 8 node cluster

        narrow table
        100 million rows
        MR : 16mins, 49sec
        Regular : 9mins, 53sec
        1 billion rows
        MR : 2hrs, 40mins, 56sec
        Regular : 1hr, 15mins, 42 seconds

        wide table
        1 billion rows
        MR : 22hrs, 54mins, 48sec *
        Regular : 8hrs, 30mins, 47sec

        • Job fails while trying to load HFiles
        Show
        tdsilva Thomas D'Silva added a comment - perf results for a 8 node cluster narrow table 100 million rows MR : 16mins, 49sec Regular : 9mins, 53sec 1 billion rows MR : 2hrs, 40mins, 56sec Regular : 1hr, 15mins, 42 seconds wide table 1 billion rows MR : 22hrs, 54mins, 48sec * Regular : 8hrs, 30mins, 47sec Job fails while trying to load HFiles
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        [~James Taylor]
        Right now, we call the waitForCompletion() on the job instance which we should change to a simple submit to return immediately. I don't see that in the attached IndexTool.java . Also, to update the state , I don't see any alternatives other than adding a reducer to the mix.
        I believe we should provide the option to the end user to choose between bulk load and direct put approach as I have the same opinion as [~Gabriel Reid] has mentioned.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - [~James Taylor] Right now, we call the waitForCompletion() on the job instance which we should change to a simple submit to return immediately. I don't see that in the attached IndexTool.java . Also, to update the state , I don't see any alternatives other than adding a reducer to the mix. I believe we should provide the option to the end user to choose between bulk load and direct put approach as I have the same opinion as [~Gabriel Reid] has mentioned.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        Thomas D'Silva
        How many reducers were running during these job executions? From your earlier mail in the mailing list, it looked like there was a single reducer writing the mapper output into HFile. The current implementation of HFileOutputFormat seems to determine the reducers based on the target table region count. If we can increase the region count of the target index table, we might see some speedup.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - Thomas D'Silva How many reducers were running during these job executions? From your earlier mail in the mailing list, it looked like there was a single reducer writing the mapper output into HFile. The current implementation of HFileOutputFormat seems to determine the reducers based on the target table region count. If we can increase the region count of the target index table, we might see some speedup.
        Hide
        samarthjain Samarth Jain added a comment -

        Until and unless we have a pre-built histogram/distribution of the index row keys, it will always be difficult to come up with the right split points/region boundaries for the index table. A somewhat decent alternative is to have the initial number of splits/regions/reducers equal to a small multiple of region servers in the cluster. Of course, there is no guarantee that the initial region splits would result in even distribution, but hopefully over time the regions will eventually balance out.

        Show
        samarthjain Samarth Jain added a comment - Until and unless we have a pre-built histogram/distribution of the index row keys, it will always be difficult to come up with the right split points/region boundaries for the index table. A somewhat decent alternative is to have the initial number of splits/regions/reducers equal to a small multiple of region servers in the cluster. Of course, there is no guarantee that the initial region splits would result in even distribution, but hopefully over time the regions will eventually balance out.
        Hide
        tdsilva Thomas D'Silva added a comment -

        The tests ran with a single reducer.

        Show
        tdsilva Thomas D'Silva added a comment - The tests ran with a single reducer.
        Hide
        jamestaylor James Taylor added a comment -

        It's single reducer because the index table is a new/empty table and thus has a single region. We can't know the split points in advance as there's no correlation between the data table split points and the index split plints. It'd be good to make it configurable to still use the HFile path, as for an index rebuild, we would have multiple, existing regions so perf should improve. We should perf test the rebuild scenario too.

        Show
        jamestaylor James Taylor added a comment - It's single reducer because the index table is a new/empty table and thus has a single region. We can't know the split points in advance as there's no correlation between the data table split points and the index split plints. It'd be good to make it configurable to still use the HFile path, as for an index rebuild, we would have multiple, existing regions so perf should improve. We should perf test the rebuild scenario too.
        Hide
        jamestaylor James Taylor added a comment -

        Perf results for a 8 node cluster

          100M narrow table (min) 1B narrow table (min) 1B wide table (min)
        Non MR 10 76 511
        HFile MR 17 161 1,375

        We'll add the HBase API MR numbers added here soon.

        As noted by Thomas, the 1B wide table fails to load the HFiles (perhaps because hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily needs to be increased) , so we're not accounting for that time in the above.

        Show
        jamestaylor James Taylor added a comment - Perf results for a 8 node cluster   100M narrow table (min) 1B narrow table (min) 1B wide table (min) Non MR 10 76 511 HFile MR 17 161 1,375 We'll add the HBase API MR numbers added here soon. As noted by Thomas, the 1B wide table fails to load the HFiles (perhaps because hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily needs to be increased) , so we're not accounting for that time in the above.
        Hide
        jamestaylor James Taylor added a comment -

        Changing the submit to return immediately, implementing the single reducer for the non HFile path, and making the MR job configurable between HFile and regular HBase APIs are not in that patch. If you have the cycles, that'd be a good contribution, ravi.

        Show
        jamestaylor James Taylor added a comment - Changing the submit to return immediately, implementing the single reducer for the non HFile path, and making the MR job configurable between HFile and regular HBase APIs are not in that patch. If you have the cycles, that'd be a good contribution, ravi .
        Hide
        gabriel.reid Gabriel Reid added a comment -

        I think that writing to HFiles with a single reducer basically eliminates any performance advantage that you can get via writing to HFiles, as it essentially turns the job into a serial process. Thomas D'Silva's performance numbers reflect this.

        Note that the work that the reducer does when writing to HFiles isn't simply deduplication – it also ensures ordering per region. At a high level it does the same thing as a compaction in a region server.

        Seeing as having a decently-split table doesn't sound like it's too feasible, the idea of writing via the "normal" HBase API and then having a single reducer whose only task is to mark the index as active sounds like a great idea.

        Show
        gabriel.reid Gabriel Reid added a comment - I think that writing to HFiles with a single reducer basically eliminates any performance advantage that you can get via writing to HFiles, as it essentially turns the job into a serial process. Thomas D'Silva 's performance numbers reflect this. Note that the work that the reducer does when writing to HFiles isn't simply deduplication – it also ensures ordering per region. At a high level it does the same thing as a compaction in a region server. Seeing as having a decently-split table doesn't sound like it's too feasible, the idea of writing via the "normal" HBase API and then having a single reducer whose only task is to mark the index as active sounds like a great idea.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        From the comments above, I see the below tasks

        a) Provide options to end user to switch between bulk load or direct api
        b) If direct api approach is chosen,
        i) change the submission of job from an wait until completion to submit the job and return immediately.
        ii) Have a new Reducer that does the task of updating the status of the index table .

        I will work on this today .
        Please feel free to add any task if I am missing .

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - From the comments above, I see the below tasks a) Provide options to end user to switch between bulk load or direct api b) If direct api approach is chosen, i) change the submission of job from an wait until completion to submit the job and return immediately. ii) Have a new Reducer that does the task of updating the status of the index table . I will work on this today . Please feel free to add any task if I am missing .
        Hide
        jamestaylor James Taylor added a comment -

        +1 maghamravikiran - I think that captures it. I've assigned the JIRA to you. Thanks in advance.

        Show
        jamestaylor James Taylor added a comment - +1 maghamravikiran - I think that captures it. I've assigned the JIRA to you. Thanks in advance.
        Hide
        maghamravikiran ravi added a comment -

        This is a WIP patch. Tests need to be added. Parking it here for early feedback.

        Show
        maghamravikiran ravi added a comment - This is a WIP patch. Tests need to be added. Parking it here for early feedback.
        Hide
        jamestaylor James Taylor added a comment -

        Thanks for the patch, ravi. It's my understanding (from Lars Hofhansl), that if you use this command:

                    TableMapReduceUtil.initTableReducerJob(logicalIndexTable, null, job);
        

        that the same context.write(outputKey, kv) we do will work, but the MR framework will issue the required batched mutation for the KVs we write through direct HBase calls. Is that not the case?

        If that works, then I think the code changes will be much less. Not sure what controls the amount of batching the the HBase calls will do.

        Show
        jamestaylor James Taylor added a comment - Thanks for the patch, ravi . It's my understanding (from Lars Hofhansl ), that if you use this command: TableMapReduceUtil.initTableReducerJob(logicalIndexTable, null , job); that the same context.write(outputKey, kv) we do will work, but the MR framework will issue the required batched mutation for the KVs we write through direct HBase calls. Is that not the case? If that works, then I think the code changes will be much less. Not sure what controls the amount of batching the the HBase calls will do.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        True James Taylor Lars point is valid but doesn't fit our use case as we need to have a reducer to update the state of the index table once the mapper output is committed to HBase . That's the reason , I am forcing an autoCommit in the mapper so that by the time the reducer begins execution, we know the data is in the index table and we are left with just updating the state.

        Lars Hofhansl please correct me if I am wrong.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - True James Taylor Lars point is valid but doesn't fit our use case as we need to have a reducer to update the state of the index table once the mapper output is committed to HBase . That's the reason , I am forcing an autoCommit in the mapper so that by the time the reducer begins execution, we know the data is in the index table and we are left with just updating the state. Lars Hofhansl please correct me if I am wrong.
        Hide
        jamestaylor James Taylor added a comment - - edited

        Did you try it, Ravi? In theory, the mapper would call the HBase APIs, so when the reducer runs, all the mapper tasks have been completed. We don't want Phoenix to send the work to HBase and we definitely don't want auto commit on, as that would cause an RPC for every row.

        Show
        jamestaylor James Taylor added a comment - - edited Did you try it, Ravi? In theory, the mapper would call the HBase APIs, so when the reducer runs, all the mapper tasks have been completed. We don't want Phoenix to send the work to HBase and we definitely don't want auto commit on, as that would cause an RPC for every row.
        Hide
        samarthjain Samarth Jain added a comment -

        Reducers could start running before all the mappers are complete. However, the reduce step in a reducer is not executed till all the mappers are done. Ravi, it looks like you are updating the index state in the setUp method. Do you know when is setUp executed? Is it executed before the shuffle phase of a reducer? If yes, it probably makes sense to move the code you have in setUp to the reduce method instead.

        Show
        samarthjain Samarth Jain added a comment - Reducers could start running before all the mappers are complete. However, the reduce step in a reducer is not executed till all the mappers are done. Ravi, it looks like you are updating the index state in the setUp method. Do you know when is setUp executed? Is it executed before the shuffle phase of a reducer? If yes, it probably makes sense to move the code you have in setUp to the reduce method instead.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        maghamravikiran

        I think the major task for the subject line, "Failure of one mapper should not affect other mappers in MR index build", is missing from the task list .

        Even a single map task eventual failure (after retires) will mark the job as "failed" and I think there is no way to resume ONLY a specific task in the next run (unless we write custom logic?). Next run will be a fresh job execution.

        James Taylor Lars Hofhansl please correct me if I'm wrong.

        Show
        rvaleti Ravi Kishore Valeti added a comment - maghamravikiran I think the major task for the subject line, "Failure of one mapper should not affect other mappers in MR index build", is missing from the task list . Even a single map task eventual failure (after retires) will mark the job as "failed" and I think there is no way to resume ONLY a specific task in the next run (unless we write custom logic?). Next run will be a fresh job execution. James Taylor Lars Hofhansl please correct me if I'm wrong.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        maghamravikiran

        Just to understand, In case of "configureSubmittableJobUsingDirectApi" mode, does it mean we are writing to both Phoenix Table (via autocommit true) and Hdfs (as mapper output KeyValue)?

        Show
        rvaleti Ravi Kishore Valeti added a comment - maghamravikiran Just to understand, In case of "configureSubmittableJobUsingDirectApi" mode, does it mean we are writing to both Phoenix Table (via autocommit true) and Hdfs (as mapper output KeyValue)?
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        True James Taylor I will correct that code .

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - True James Taylor I will correct that code .
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        My bad. Will move it to the reduce method to be absolutely sure.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - My bad. Will move it to the reduce method to be absolutely sure.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        Out of turn, but I agree. I believe, we need to break up this task into two broadly, 1) resilency 2) quick execution

        Right now, with the direct API approach, we achieve the quicker execution of the job as we have seen the direct API performing far better than the HFiles route . This can be partially attributed to the fact there was only one reducer shuffling across the mapper output. Here, if one mapper fails, there is a possibility that few successful mappers have already committed data onto the index table. Is this ok?

        For resilency, I believe the bulk load approach is good as it's a all or nothing job. We don't copy HFiles onto the index table until all the mappers and reducer is completed .

        In both approaches, the important task we need to address is looking out for options to avoid successful mappers from being re run. Is this possible and what are the best means to address it.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - Out of turn, but I agree. I believe, we need to break up this task into two broadly, 1) resilency 2) quick execution Right now, with the direct API approach, we achieve the quicker execution of the job as we have seen the direct API performing far better than the HFiles route . This can be partially attributed to the fact there was only one reducer shuffling across the mapper output. Here, if one mapper fails, there is a possibility that few successful mappers have already committed data onto the index table. Is this ok? For resilency, I believe the bulk load approach is good as it's a all or nothing job. We don't copy HFiles onto the index table until all the mappers and reducer is completed . In both approaches, the important task we need to address is looking out for options to avoid successful mappers from being re run. Is this possible and what are the best means to address it.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        In this mode, we write to HBase alone.

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - In this mode, we write to HBase alone.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        My bad. I overlooked the null writable reducer in this mode. Nm, thanks

        Show
        rvaleti Ravi Kishore Valeti added a comment - My bad. I overlooked the null writable reducer in this mode. Nm, thanks
        Hide
        lhofhansl Lars Hofhansl added a comment -

        Sorry... Bit late. The specific problem for index builds is that (a) it goes into an initially empty table (b) we can't know ahead of time how to presplit the table (unless we do a first pass or sample).
        With that, the index build will go through a single reducer producing a single - possibly humongous region, because we'll have a table with a single region, which HBase then needs to split potentially multiple times.

        I agree using the HBase front door is not ideal either, but at least we're writing in memstore-size chunks and HBase will split as we go.

        Show
        lhofhansl Lars Hofhansl added a comment - Sorry... Bit late. The specific problem for index builds is that (a) it goes into an initially empty table (b) we can't know ahead of time how to presplit the table (unless we do a first pass or sample). With that, the index build will go through a single reducer producing a single - possibly humongous region, because we'll have a table with a single region, which HBase then needs to split potentially multiple times. I agree using the HBase front door is not ideal either, but at least we're writing in memstore-size chunks and HBase will split as we go.
        Hide
        lhofhansl Lars Hofhansl added a comment -

        Sure... With that we'll need to a reducer class.
        I'm not an expert in this, can we configure a reduce that does nothing and also avoid the shuffle phase?
        If M/R does the shuffle just so that the reducer can ignore all the data, we haven't gained anything.

        Show
        lhofhansl Lars Hofhansl added a comment - Sure... With that we'll need to a reducer class. I'm not an expert in this, can we configure a reduce that does nothing and also avoid the shuffle phase? If M/R does the shuffle just so that the reducer can ignore all the data, we haven't gained anything.
        Hide
        rvaleti Ravi Kishore Valeti added a comment - - edited

        I just uploaded another variant using HBase Frontdoor API. I tested both modes (existing & direct API mode) and both worked.

        James Taylor Lars Hofhansl maghamravikiran please review.

        Show
        rvaleti Ravi Kishore Valeti added a comment - - edited I just uploaded another variant using HBase Frontdoor API. I tested both modes (existing & direct API mode) and both worked. James Taylor Lars Hofhansl maghamravikiran please review.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        Ravi Kishore Valeti I believe you missed PhoenixTableOutputFormat class in the patch. I am assuming you are updating the index table state in the PhoenixTableOutputFormat class ?

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - Ravi Kishore Valeti I believe you missed PhoenixTableOutputFormat class in the patch. I am assuming you are updating the index table state in the PhoenixTableOutputFormat class ?
        Hide
        rvaleti Ravi Kishore Valeti added a comment - - edited

        Apologies. I added the missing file now.
        Yes. Index state update is the last step of PhoenixTableOutputFormat class.

        Show
        rvaleti Ravi Kishore Valeti added a comment - - edited Apologies. I added the missing file now. Yes. Index state update is the last step of PhoenixTableOutputFormat class.
        Hide
        maghamravikiran@gmail.com maghamravikiran added a comment -

        Thanks Ravi Kishore Valeti for the patch. Can you add few test cases .

        Show
        maghamravikiran@gmail.com maghamravikiran added a comment - Thanks Ravi Kishore Valeti for the patch. Can you add few test cases .
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        Sure. I will add some tests today and also will see if mutation batching can be applied to speed up the process

        Show
        rvaleti Ravi Kishore Valeti added a comment - Sure. I will add some tests today and also will see if mutation batching can be applied to speed up the process
        Hide
        jamestaylor James Taylor added a comment -

        Thanks, Ravi Kishore Valeti. Don't bother perf testing without batching as perf will be bad.

        Show
        jamestaylor James Taylor added a comment - Thanks, Ravi Kishore Valeti . Don't bother perf testing without batching as perf will be bad.
        Hide
        rvaleti Ravi Kishore Valeti added a comment - - edited

        While writing test cases, I realized that "-direct" switch cannot be deterministically tested. Reason being, we submit the job & come out and test does not know how much time to wait before asserting on cases.

        I would like to add a new switch "-runbg" that applies on top of "-direct" switch to run either in background when applied (job.submit) or in-line (job.waitForCompletion) otherwise. With this, we can deterministically write a test for "-direct" switch.

        James Taylor maghamravikiran Thoughts?

        Show
        rvaleti Ravi Kishore Valeti added a comment - - edited While writing test cases, I realized that "-direct" switch cannot be deterministically tested. Reason being, we submit the job & come out and test does not know how much time to wait before asserting on cases. I would like to add a new switch "-runbg" that applies on top of "-direct" switch to run either in background when applied (job.submit) or in-line (job.waitForCompletion) otherwise. With this, we can deterministically write a test for "-direct" switch. James Taylor maghamravikiran Thoughts?
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        Rajeshbabu Chintaguntla Rajeshbabu Chintaguntla

        While working on IndexTool.java for this JIRA, I had a doubt/question on LOCAL index in the context of IndexTool.java.

        In MR Index Build job using IndexTool.java, how are we making sure LOCAL index builds maintain region co-location with their corresponding Data Table's region?
        Code wise, it looks like, we are NOT maintaining this property for LOCAL indexes which makes both GLOBAL & LOCAL indexes no different from each other when indexes are created as "Async".

        Correct me If my understanding is incorrect.

        Show
        rvaleti Ravi Kishore Valeti added a comment - Rajeshbabu Chintaguntla Rajeshbabu Chintaguntla While working on IndexTool.java for this JIRA, I had a doubt/question on LOCAL index in the context of IndexTool.java. In MR Index Build job using IndexTool.java, how are we making sure LOCAL index builds maintain region co-location with their corresponding Data Table's region? Code wise, it looks like, we are NOT maintaining this property for LOCAL indexes which makes both GLOBAL & LOCAL indexes no different from each other when indexes are created as "Async". Correct me If my understanding is incorrect.
        Hide
        rajeshbabu Rajeshbabu Chintaguntla added a comment -

        In MR Index Build job using IndexTool.java, how are we making sure LOCAL index builds maintain region co-location with their corresponding Data Table's region?

        The local index table region's colocated with data table regions during index table creation itself. When preparing index updates we ensure the updates go to the index rergion colocated with data region. That's why we are not seeing any difference for global/local indexes.

        Show
        rajeshbabu Rajeshbabu Chintaguntla added a comment - In MR Index Build job using IndexTool.java, how are we making sure LOCAL index builds maintain region co-location with their corresponding Data Table's region? The local index table region's colocated with data table regions during index table creation itself. When preparing index updates we ensure the updates go to the index rergion colocated with data region. That's why we are not seeing any difference for global/local indexes.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        "The local index table region's colocated with data table regions during index table creation itself."

        Does this hold true even for an index declared as "Async" in create index statement?

        Show
        rvaleti Ravi Kishore Valeti added a comment - "The local index table region's colocated with data table regions during index table creation itself." Does this hold true even for an index declared as "Async" in create index statement?
        Hide
        jamestaylor James Taylor added a comment -

        In theory, the MR index build for local indexes should be very quick. We know the split points of the index table based on the data table and we should be able to build one HFile per data table region in our Mapper and just hand them off to HBase. Rajeshbabu Chintaguntla - are we taking advantage of knowing the split points? No reduce phase should be necessary, so we can take the same approach as we're looking at now for front-door-HBase-APIs MR build: make it asynchronous and set the index state to active in the reduce phase. However, it seems like there'd be corner cases in which the data table may split while the index is being built - it's unclear to me how this scenario would be handled.

        Also, it seems there are problems with IndexTool for local indexes, as there are scenarios where the MR completes yet a scan over the Phoenix tables says there are 0 rows. Will follow up on this in a separate JIRA, Rajeshbabu Chintaguntla. Note that the code path is different for the MR index build than the standard build mechanism in Phoenix. I think we need to increase our testing in the area.

        Show
        jamestaylor James Taylor added a comment - In theory, the MR index build for local indexes should be very quick. We know the split points of the index table based on the data table and we should be able to build one HFile per data table region in our Mapper and just hand them off to HBase. Rajeshbabu Chintaguntla - are we taking advantage of knowing the split points? No reduce phase should be necessary, so we can take the same approach as we're looking at now for front-door-HBase-APIs MR build: make it asynchronous and set the index state to active in the reduce phase. However, it seems like there'd be corner cases in which the data table may split while the index is being built - it's unclear to me how this scenario would be handled. Also, it seems there are problems with IndexTool for local indexes, as there are scenarios where the MR completes yet a scan over the Phoenix tables says there are 0 rows. Will follow up on this in a separate JIRA, Rajeshbabu Chintaguntla . Note that the code path is different for the MR index build than the standard build mechanism in Phoenix. I think we need to increase our testing in the area.
        Hide
        rajeshbabu Rajeshbabu Chintaguntla added a comment -

        are we taking advantage of knowing the split points? No reduce phase should be necessary, so we can take the same approach as we're looking at now for front-door-HBase-APIs MR build:

        I have not fully checked IndexTool code. Will check and get back to you.

        it seems like there'd be corner cases in which the data table may split while the index is being built - it's unclear to me how this scenario would be handled.

        Basically in normal cases when there is region split during mapreduce phase the hfile data will be seperated into the child regions before loading. It's done in HBase using half store file reader but the reader is not configurable in LoadIncrementalHFiles. Using the same default implementation of HalfStoreFileReader may spoil local index data like all the data go to first child only. For this we need code change in HBase. One thing we can do is before loading local index data we can check the split keys are same or not and fail if there are any change in split keys and guide the user to re run the job.

        it seems there are problems with IndexTool for local indexes, as there are scenarios where the MR completes yet a scan over the Phoenix tables says there are 0 rows.

        Need to check this.

        Show
        rajeshbabu Rajeshbabu Chintaguntla added a comment - are we taking advantage of knowing the split points? No reduce phase should be necessary, so we can take the same approach as we're looking at now for front-door-HBase-APIs MR build: I have not fully checked IndexTool code. Will check and get back to you. it seems like there'd be corner cases in which the data table may split while the index is being built - it's unclear to me how this scenario would be handled. Basically in normal cases when there is region split during mapreduce phase the hfile data will be seperated into the child regions before loading. It's done in HBase using half store file reader but the reader is not configurable in LoadIncrementalHFiles. Using the same default implementation of HalfStoreFileReader may spoil local index data like all the data go to first child only. For this we need code change in HBase. One thing we can do is before loading local index data we can check the split keys are same or not and fail if there are any change in split keys and guide the user to re run the job. it seems there are problems with IndexTool for local indexes, as there are scenarios where the MR completes yet a scan over the Phoenix tables says there are 0 rows. Need to check this.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        James Taylor maghamravikiran

        Just uploaded - PHOENIX-2154-_HBase_Frontdoor_API_v1.patch

        Patch contains

        • Changes to build using HBase Front door APIs
        • Tests added in IndexToolIT to test the HBase Front door API functionality
        Show
        rvaleti Ravi Kishore Valeti added a comment - James Taylor maghamravikiran Just uploaded - PHOENIX-2154 -_HBase_Frontdoor_API_v1.patch Patch contains Changes to build using HBase Front door APIs Tests added in IndexToolIT to test the HBase Front door API functionality
        Hide
        jamestaylor James Taylor added a comment -

        Nice work, Ravi Kishore Valeti. One question: when is the call to TableRecordWriter.close(TaskAttemptContext context) made? After all mappers have completed or after each mapper completes? If the former then we're good.

        Regarding the need for a run-foreground I'm -0 on it. It's easy enough to have a while sleep loop that checks if the index state has been updated to active. I just want to make sure our unit tests test the real code path that would be used in production.

        For IndexToolIT.testSecondaryIndex(), we should check that the secondary index is valid wrt to the data table. Thomas D'Silva has some code that does that - you can basically join the index to the data table and confirm that all the values match.

        Show
        jamestaylor James Taylor added a comment - Nice work, Ravi Kishore Valeti . One question: when is the call to TableRecordWriter.close(TaskAttemptContext context) made? After all mappers have completed or after each mapper completes? If the former then we're good. Regarding the need for a run-foreground I'm -0 on it. It's easy enough to have a while sleep loop that checks if the index state has been updated to active. I just want to make sure our unit tests test the real code path that would be used in production. For IndexToolIT.testSecondaryIndex(), we should check that the secondary index is valid wrt to the data table. Thomas D'Silva has some code that does that - you can basically join the index to the data table and confirm that all the values match.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -
        • Yes, TableRecordWriter.close gets called only after all maps are complete.
        • My first take was to do a wait loop until Index state becomes active. But, decided to go "-runfg" route because of below reasons
          i) Foreground will stand as an option for user if he wants to watch the progress online.
          ii) Testing a foreground execution is equivalent of testing background job. We do not need to test the MR Job Submission phase.
          iii) Tests do not need extra logic of waiting for index state to become active.
        • Yes, One thing I observed in IndexToolIT is none of the tests are actually checking for data validity. They are only checking if index is built. Will talk to Thomas D'Silva and add that check for all tests of IndexToolIT.
        Show
        rvaleti Ravi Kishore Valeti added a comment - Yes, TableRecordWriter.close gets called only after all maps are complete. My first take was to do a wait loop until Index state becomes active. But, decided to go "-runfg" route because of below reasons i) Foreground will stand as an option for user if he wants to watch the progress online. ii) Testing a foreground execution is equivalent of testing background job. We do not need to test the MR Job Submission phase. iii) Tests do not need extra logic of waiting for index state to become active. Yes, One thing I observed in IndexToolIT is none of the tests are actually checking for data validity. They are only checking if index is built. Will talk to Thomas D'Silva and add that check for all tests of IndexToolIT.
        Hide
        jamestaylor James Taylor added a comment -

        +1. Thomas D'Silva - please commit this to master and 4.x branches. If we can perf test this next, in parallel with improving the unit tests, that'd be good.

        Please file the HBase bug to make the member variable you need protected (or have an accessor for it) if you think that would improve things, Ravi Kishore Valeti.

        Show
        jamestaylor James Taylor added a comment - +1. Thomas D'Silva - please commit this to master and 4.x branches. If we can perf test this next, in parallel with improving the unit tests, that'd be good. Please file the HBase bug to make the member variable you need protected (or have an accessor for it) if you think that would improve things, Ravi Kishore Valeti .
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        By the way, the patch I uploaded was on top of 4.5-HBase-0.98 branch. I hope this would not result in conflicts on master branch. If it results in conflicts, can upload a patch for master branch.

        Show
        rvaleti Ravi Kishore Valeti added a comment - By the way, the patch I uploaded was on top of 4.5-HBase-0.98 branch. I hope this would not result in conflicts on master branch. If it results in conflicts, can upload a patch for master branch.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        James Taylor,
        Looks like TableRecordWriter.close(TaskAttemptContext context) will be called per map task. In my earlier testing, I only had one mapper, so did not realize that. Just confirmed with Maddineni Sukumar on this.

        Thomas D'Silva, please do not check-in. I will upload a new version by modifying the mapper and moving Index state update to Reducer.

        Show
        rvaleti Ravi Kishore Valeti added a comment - James Taylor , Looks like TableRecordWriter.close(TaskAttemptContext context) will be called per map task. In my earlier testing, I only had one mapper, so did not realize that. Just confirmed with Maddineni Sukumar on this. Thomas D'Silva , please do not check-in. I will upload a new version by modifying the mapper and moving Index state update to Reducer.
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        Uploaded v2 - moved index state change step to reducer.
        Each Map task will just write a single dummy key value to avoid outputting all records to reducer.

        Show
        rvaleti Ravi Kishore Valeti added a comment - Uploaded v2 - moved index state change step to reducer. Each Map task will just write a single dummy key value to avoid outputting all records to reducer.
        Hide
        jamestaylor James Taylor added a comment -

        If we don't write the dummy key value in the mapper.cleanup() method, what happens? Will the reducer run over all the KeyValues we generated during the map phase? Does writing that dummy key value prevent this? Seems somewhat weird, but I'm +1 if it works (and is necessary).

        Thomas D'Silva - would you mind reviewing too?

        Show
        jamestaylor James Taylor added a comment - If we don't write the dummy key value in the mapper.cleanup() method, what happens? Will the reducer run over all the KeyValues we generated during the map phase? Does writing that dummy key value prevent this? Seems somewhat weird, but I'm +1 if it works (and is necessary). Thomas D'Silva - would you mind reviewing too?
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        Ex: we have a single map task with 1000 records.

        If we do context.write() in map() method, no.of input records for reducer=1000
        If we do context.write() in cleanup() method ( this will be called once all records are processed by map()), by writing some dummy key-value, then no. Of input records to reduce = no.of map tasks=1.

        I tested below cases

        • context.write(key,value) in map() method
          Result: map input records = 1000
          map output records = 1000
          Reducer input records = 1000
        • context.write(key,value) in cleanup() method
          Result: map input records = 1000
          map output records = 1
          Reducer input records = 1
        Show
        rvaleti Ravi Kishore Valeti added a comment - Ex: we have a single map task with 1000 records. If we do context.write() in map() method, no.of input records for reducer=1000 If we do context.write() in cleanup() method ( this will be called once all records are processed by map()), by writing some dummy key-value, then no. Of input records to reduce = no.of map tasks=1. I tested below cases context.write(key,value) in map() method Result: map input records = 1000 map output records = 1000 Reducer input records = 1000 context.write(key,value) in cleanup() method Result: map input records = 1000 map output records = 1 Reducer input records = 1
        Hide
        jamestaylor James Taylor added a comment -

        I see. LGTM. Nice work, Ravi Kishore Valeti. Let's get this committed and then perf test it.

        Show
        jamestaylor James Taylor added a comment - I see. LGTM. Nice work, Ravi Kishore Valeti . Let's get this committed and then perf test it.
        Hide
        tdsilva Thomas D'Silva added a comment -

        +1 I will get this committed.

        Show
        tdsilva Thomas D'Silva added a comment - +1 I will get this committed.
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Phoenix-master #883 (See https://builds.apache.org/job/Phoenix-master/883/)
        PHOENIX-2154 Failure of one mapper should not affect other mappers in MR index build (Ravi Kishore Valeti) (tdsilva: rev 16fcdf9e1c116758027b79a24f9ec701cb63496f)

        • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
        • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
        • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
        • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
        • phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
        • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Phoenix-master #883 (See https://builds.apache.org/job/Phoenix-master/883/ ) PHOENIX-2154 Failure of one mapper should not affect other mappers in MR index build (Ravi Kishore Valeti) (tdsilva: rev 16fcdf9e1c116758027b79a24f9ec701cb63496f) phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
        Hide
        rvaleti Ravi Kishore Valeti added a comment -

        Direct HBase APIs 24 84 1450

        Show
        rvaleti Ravi Kishore Valeti added a comment - Direct HBase APIs 24 84 1450
        Hide
        jamestaylor James Taylor added a comment -

        Thanks, Ravi Kishore Valeti. I created PHOENIX-2292 for further performance work. I think this JIRA can be closed.

        Show
        jamestaylor James Taylor added a comment - Thanks, Ravi Kishore Valeti . I created PHOENIX-2292 for further performance work. I think this JIRA can be closed.

          People

          • Assignee:
            rvaleti Ravi Kishore Valeti
            Reporter:
            jamestaylor James Taylor
          • Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development