Bookkeeper
  1. Bookkeeper
  2. BOOKKEEPER-256

Update remote component sequence IDs only for messages from the source region

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 4.0.0, 4.1.0
    • Fix Version/s: None
    • Component/s: hedwig-server
    • Labels:
      None

      Description

      In the current setup, remote components for a message being persisted don't give any meaningful information about the number of messages the hub has received for that topic from other regions. The provided patch fixes this by updating the remote component of the locally persisted message for a region X only if the message received by the RegionManager originates from region X.

      Edit - You can take a look at the discussion at http://mail-archives.apache.org/mod_mbox/zookeeper-bookkeeper-dev/201205.mbox/%3cCAOLhyDTEm5=p8eMD8XmVCY_6ktB40RQx6dWWY50ARbAEbdgtsQ@mail.gmail.com%3e for context.

      Review board link : https://reviews.apache.org/r/6789/

      1. BK-256.patch
        6 kB
        Aniruddha
      2. BK-256-v1.patch
        6 kB
        Aniruddha
      3. BK-256-v2.patch
        6 kB
        Aniruddha
      4. BK-256.patch
        22 kB
        Aniruddha
      5. BOOKKEEPER-256.pdf
        70 kB
        Sijie Guo

        Activity

        Hide
        Hyun Goo Kang added a comment -

        Couple comments:

        • In takeRegionSpecificMaximum(), it looks like we would take a component from id2 regardless of its region, if its corresponding component does not exist in id1. Is that what you want?
        • This is out of scope, but I guess we could update the message's seq-id after you merge the two seq-ids in LocalDBPersistenceManager::persistMessage.
        • This is out of scope again, but it looks like the new message builder that merges two ids is never used (RegionManager::doRemoteSubscribe, line 214).
        • It looks like your one-line fix in RegionManager.java would be sufficient for this issue. The existing 'takeRegionMaximum' should work just fine with the new sequence-ids that contains only the srcRegion component. Could you also add some comment to the line you changed in RegionManager.java to clarify why we would want to propagate only the srcRegion component?
        • Is 'takeRegionMaximum' doesn't seem to be used anywhere else. We could remove it if we decide to keep the 'takeRegionSpecificMaximum' method.
        Show
        Hyun Goo Kang added a comment - Couple comments: In takeRegionSpecificMaximum(), it looks like we would take a component from id2 regardless of its region, if its corresponding component does not exist in id1. Is that what you want? This is out of scope, but I guess we could update the message's seq-id after you merge the two seq-ids in LocalDBPersistenceManager::persistMessage. This is out of scope again, but it looks like the new message builder that merges two ids is never used (RegionManager::doRemoteSubscribe, line 214). It looks like your one-line fix in RegionManager.java would be sufficient for this issue. The existing 'takeRegionMaximum' should work just fine with the new sequence-ids that contains only the srcRegion component. Could you also add some comment to the line you changed in RegionManager.java to clarify why we would want to propagate only the srcRegion component? Is 'takeRegionMaximum' doesn't seem to be used anywhere else. We could remove it if we decide to keep the 'takeRegionSpecificMaximum' method.
        Hide
        Sijie Guo added a comment - - edited

        just some comments:

        1) seems that the line you changed in RegionManager is not used. so my question is how we propagate the seqid from src region to remote components list, the takeRegionMaximum() might not work correctly? if I missed anything, please correct me.

        2) you just improved the behavior of takeRegionMaximum to make it just take the maximum id for source region, right?

        +    public static void takeRegionSpecificMaximum(MessageSeqId.Builder newIdBuilder, MessageSeqId id1, MessageSeqId id2, ByteString srcRegion) {
        +        Map<ByteString, RegionSpecificSeqId> id2Map = MessageIdUtils.inMapForm(id2);
        +        for (RegionSpecificSeqId rssid1 : id1.getRemoteComponentsList()) {
        +            if(rssid1.getRegion().equals(srcRegion)) {
        +                RegionSpecificSeqId rssid2 = id2Map.get(srcRegion);
        +                if (rssid2 != null) {
        +                    newIdBuilder.addRemoteComponents((rssid1.getSeqId() > rssid2.getSeqId() ? rssid1 : rssid2));
        +                } else {
        +                    newIdBuilder.addRemoteComponents(rssid1);
        +                }
        +            } else {
        +                newIdBuilder.addRemoteComponents(rssid1);
        +            }
        +            id2Map.remove(rssid1.getRegion());
        +        }
        +        for (RegionSpecificSeqId rssid2 : id2Map.values()) {
        +            if (rssid2.getRegion().equals(srcRegion)) {
        +                newIdBuilder.addRemoteComponents(rssid2);
        +            }
        +        }
        +    }
        

        as the patch indicated that the id2 is srcRegion seq id. If so, we don't need to check region specific of src region in id2. we could improve it as below, which could fix the possible issue in my 1) comment.

        Beside that, the variable name of takeRegionSpecificMaximum could be improved as takeRegionSpecificMaximum(newBuilder, lastPushSeqId, srcRegionSeqId, srcRegion) to make it more clear. Also if takeRegionMaximum is replaced by takeRegionSpecificMaximum, we could remove takeRegionMaximum.

        +        for (RegionSpecificSeqId rssid1 : id1.getRemoteComponentsList()) {
        +            if(rssid1.getRegion().equals(srcRegion)) {
                         RegionSpecificSeqId newRssid = rssid1;
                         if (rssid1.getSeqId() < id2.getLocalComponent()) {
                             newRssid = ... // build region specific seq id here.
                         }
                         newIdBuilder.addRemoteComponents(newRssid);
        +            } else {
        +                newIdBuilder.addRemoteComponents(rssid1);
        +            }
        +            id2Map.remove(rssid1.getRegion());
        +        }
        

        BTW, thanks for changing the order of two seq id in LocalDBPersistenceManager, it seems it is a potential bug, right?

        Show
        Sijie Guo added a comment - - edited just some comments: 1) seems that the line you changed in RegionManager is not used. so my question is how we propagate the seqid from src region to remote components list, the takeRegionMaximum() might not work correctly? if I missed anything, please correct me. 2) you just improved the behavior of takeRegionMaximum to make it just take the maximum id for source region, right? + public static void takeRegionSpecificMaximum(MessageSeqId.Builder newIdBuilder, MessageSeqId id1, MessageSeqId id2, ByteString srcRegion) { + Map<ByteString, RegionSpecificSeqId> id2Map = MessageIdUtils.inMapForm(id2); + for (RegionSpecificSeqId rssid1 : id1.getRemoteComponentsList()) { + if (rssid1.getRegion().equals(srcRegion)) { + RegionSpecificSeqId rssid2 = id2Map.get(srcRegion); + if (rssid2 != null ) { + newIdBuilder.addRemoteComponents((rssid1.getSeqId() > rssid2.getSeqId() ? rssid1 : rssid2)); + } else { + newIdBuilder.addRemoteComponents(rssid1); + } + } else { + newIdBuilder.addRemoteComponents(rssid1); + } + id2Map.remove(rssid1.getRegion()); + } + for (RegionSpecificSeqId rssid2 : id2Map.values()) { + if (rssid2.getRegion().equals(srcRegion)) { + newIdBuilder.addRemoteComponents(rssid2); + } + } + } as the patch indicated that the id2 is srcRegion seq id. If so, we don't need to check region specific of src region in id2. we could improve it as below, which could fix the possible issue in my 1) comment. Beside that, the variable name of takeRegionSpecificMaximum could be improved as takeRegionSpecificMaximum(newBuilder, lastPushSeqId, srcRegionSeqId, srcRegion) to make it more clear. Also if takeRegionMaximum is replaced by takeRegionSpecificMaximum, we could remove takeRegionMaximum. + for (RegionSpecificSeqId rssid1 : id1.getRemoteComponentsList()) { + if (rssid1.getRegion().equals(srcRegion)) { RegionSpecificSeqId newRssid = rssid1; if (rssid1.getSeqId() < id2.getLocalComponent()) { newRssid = ... // build region specific seq id here. } newIdBuilder.addRemoteComponents(newRssid); + } else { + newIdBuilder.addRemoteComponents(rssid1); + } + id2Map.remove(rssid1.getRegion()); + } BTW, thanks for changing the order of two seq id in LocalDBPersistenceManager, it seems it is a potential bug, right?
        Hide
        Aniruddha added a comment -

        I have updated the patch to address Hyun's comments.
        1) Yes, seems like I had attached the wrong patch file. This is fixed.
        2) This should probably go in a new ticket.
        3) Changed this, I was under the impression that Message.newBuilder modified the argument.
        4) Done.
        5) Agreed.

        Show
        Aniruddha added a comment - I have updated the patch to address Hyun's comments. 1) Yes, seems like I had attached the wrong patch file. This is fixed. 2) This should probably go in a new ticket. 3) Changed this, I was under the impression that Message.newBuilder modified the argument. 4) Done. 5) Agreed.
        Hide
        Aniruddha added a comment - - edited

        @Sijie, the code handles the condition when the message might not have it's local component in sync with the original message. The RegionManager builds the message passed on to the persistence manager. While doing this, I believe it resets the message ID.

        The order of the Message sequence IDs would not matter if we used takeRegionMaximum, but with would matter when used with the new code.

        Also, we can remove takeRegionMaximum if there are no plans to use it somewhere else.

        Show
        Aniruddha added a comment - - edited @Sijie, the code handles the condition when the message might not have it's local component in sync with the original message. The RegionManager builds the message passed on to the persistence manager. While doing this, I believe it resets the message ID. The order of the Message sequence IDs would not matter if we used takeRegionMaximum, but with would matter when used with the new code. Also, we can remove takeRegionMaximum if there are no plans to use it somewhere else.
        Hide
        Sijie Guo added a comment -

        @Aniruddha, thanks for clarification.

        > The order of the Message sequence IDs would not matter if we used takeRegionMaximum, but with would matter when used with the new code.

        yeah, ur right.

        >

        How about the proposal in my previous comment? We remove the line in region manager and improve takeRegionSpecificManager as I suggested.

        From you new patch, the message would be build twice. once in region manager, then in persistence manager. As what I suggested, we don't need to build a new message in region manager, which just propagate seq id to remote components list. we could avoid it by building it when takeRegionSpecificMaxmimum, right?

        How is your opinion?

        Show
        Sijie Guo added a comment - @Aniruddha, thanks for clarification. > The order of the Message sequence IDs would not matter if we used takeRegionMaximum, but with would matter when used with the new code. yeah, ur right. > How about the proposal in my previous comment? We remove the line in region manager and improve takeRegionSpecificManager as I suggested. From you new patch, the message would be build twice. once in region manager, then in persistence manager. As what I suggested, we don't need to build a new message in region manager, which just propagate seq id to remote components list. we could avoid it by building it when takeRegionSpecificMaxmimum, right? How is your opinion?
        Hide
        Aniruddha added a comment -

        Sijie, I've updated the patch to incorporate your suggestions. Please take a look.

        Show
        Aniruddha added a comment - Sijie, I've updated the patch to incorporate your suggestions. Please take a look.
        Hide
        Sijie Guo added a comment -

        Thanks, Aniruddha.

        the patch looks good. just one comment about format, it would be better to fold a long line into 2 or more shorter lines.

        besides that, I would like to ask Ivan to take a look at this patch, since he involved in the discussion.

        Show
        Sijie Guo added a comment - Thanks, Aniruddha. the patch looks good. just one comment about format, it would be better to fold a long line into 2 or more shorter lines. besides that, I would like to ask Ivan to take a look at this patch, since he involved in the discussion.
        Hide
        Uma Maheswara Rao G added a comment -

        @Aniruddha, Please take a look at this info . This will help you to format it correctly as per BK guidlines.

        Show
        Uma Maheswara Rao G added a comment - @Aniruddha, Please take a look at this info . This will help you to format it correctly as per BK guidlines.
        Hide
        Aniruddha added a comment -

        Shortened long lines.

        Show
        Aniruddha added a comment - Shortened long lines.
        Hide
        Sijie Guo added a comment -

        lgtm +1

        Show
        Sijie Guo added a comment - lgtm +1
        Hide
        Ivan Kelly added a comment -

        The patch looks correct. It would be good to have a test case or two to verify that it is though. Could you add some checks in TestHedwigRegion?

        Show
        Ivan Kelly added a comment - The patch looks correct. It would be good to have a test case or two to verify that it is though. Could you add some checks in TestHedwigRegion?
        Hide
        Ivan Kelly added a comment -

        Testing is a bit awkward for this, but what you could do is use something like jOOR to get access to
        PubSubServer.rm.clients and then for each client, access the topicSubscriber2Channel in the subscribers, to get at the channels, which then can be set to read only.

        This would effectively partition the PubSubServer from the other hubs in the test system.

        Show
        Ivan Kelly added a comment - Testing is a bit awkward for this, but what you could do is use something like jOOR to get access to PubSubServer.rm.clients and then for each client, access the topicSubscriber2Channel in the subscribers, to get at the channels, which then can be set to read only. This would effectively partition the PubSubServer from the other hubs in the test system.
        Hide
        Aniruddha added a comment -

        Thanks for reviewing. I'll take a look at what Ivan suggested to come up with unit tests.

        Show
        Aniruddha added a comment - Thanks for reviewing. I'll take a look at what Ivan suggested to come up with unit tests.
        Hide
        Flavio Junqueira added a comment -

        There hasn't been recent activity in this jira, so I propose to move it to 4.2.0. Is it ok?

        Show
        Flavio Junqueira added a comment - There hasn't been recent activity in this jira, so I propose to move it to 4.2.0. Is it ok?
        Hide
        Aniruddha added a comment -

        Yes, moving to 4.2.0 is fine. Will update this Jira soon.

        Show
        Aniruddha added a comment - Yes, moving to 4.2.0 is fine. Will update this Jira soon.
        Hide
        Aniruddha added a comment -

        Sorry for the delay.

        Show
        Aniruddha added a comment - Sorry for the delay.
        Hide
        Aniruddha added a comment -

        Could someone take a look at this?

        Show
        Aniruddha added a comment - Could someone take a look at this?
        Hide
        Sijie Guo added a comment -

        I will try to follow up today. just one suggestion: when a patch is ready for review, could you also attach the patch to this jira and change the status to patch available? since sometimes the reviewers might miss the activities of a jira, when he looks all the patch available jiras, he could find your patch to review.

        Show
        Sijie Guo added a comment - I will try to follow up today. just one suggestion: when a patch is ready for review, could you also attach the patch to this jira and change the status to patch available? since sometimes the reviewers might miss the activities of a jira, when he looks all the patch available jiras, he could find your patch to review.
        Hide
        Sijie Guo added a comment -

        canceled the patch until fixing the comments in review board.

        Show
        Sijie Guo added a comment - canceled the patch until fixing the comments in review board.
        Hide
        Ivan Kelly added a comment -

        This patch looks like it's really to go in except for one question about one comment on reviewboard. Aniruddha Could you take a look at the question, and then we could get this pushed in? (also may need a rebase).

        Tests look good btw

        Show
        Ivan Kelly added a comment - This patch looks like it's really to go in except for one question about one comment on reviewboard. Aniruddha Could you take a look at the question, and then we could get this pushed in? (also may need a rebase). Tests look good btw
        Hide
        Flavio Junqueira added a comment -

        Hi guys, What's the status here? Are we fixing this one for 4.2.0?

        Show
        Flavio Junqueira added a comment - Hi guys, What's the status here? Are we fixing this one for 4.2.0?
        Hide
        Flavio Junqueira added a comment -

        Aniruddha Could you please have a look at Ivan's comment so that we can have this patch in for 4.2.0? Thanks.

        Show
        Flavio Junqueira added a comment - Aniruddha Could you please have a look at Ivan's comment so that we can have this patch in for 4.2.0? Thanks.
        Hide
        Yixue (Andrew) Zhu added a comment -

        Maybe I miss something, I do not think the protocol works. For example of following sequence,

        1. A add message, let us denote it as [A1]. The state after [A1] reached B, but not C yet.
        A 1
        B 1 1
        C

        2. B add message, let us denote it as [A1, B2]. The state after [A1, B2] reached C.
        A 1
        B 1 2
        C 2 1

        [A1, B2] is persisted as (B2, C1)

        3. [A1] reaches C.
        A 1
        B 1 2
        C 1 2 2

        [A1] is persisted as (A1, B2 , C2)

        Now, when client in Region C read from the topic, how does it interpret the order between [A1] versus [A1, B2], since the former is persisted as (B2, C1), while the latter is persisted as (A1, B2 , C2).

        The problem can be addressed if we separate the region clock/version vector state with message persistence.

        Show
        Yixue (Andrew) Zhu added a comment - Maybe I miss something, I do not think the protocol works. For example of following sequence, 1. A add message, let us denote it as [A1] . The state after [A1] reached B, but not C yet. A 1 B 1 1 C 2. B add message, let us denote it as [A1, B2] . The state after [A1, B2] reached C. A 1 B 1 2 C 2 1 [A1, B2] is persisted as (B2, C1) 3. [A1] reaches C. A 1 B 1 2 C 1 2 2 [A1] is persisted as (A1, B2 , C2) Now, when client in Region C read from the topic, how does it interpret the order between [A1] versus [A1, B2] , since the former is persisted as (B2, C1), while the latter is persisted as (A1, B2 , C2). The problem can be addressed if we separate the region clock/version vector state with message persistence.
        Hide
        Yixue (Andrew) Zhu added a comment -

        Correction -
        [A1] is persisted as (A1, B2, C1), while
        [A1, B2] is persisted as (B2, C2).

        The order seems reversed now.

        Show
        Yixue (Andrew) Zhu added a comment - Correction - [A1] is persisted as (A1, B2, C1), while [A1, B2] is persisted as (B2, C2). The order seems reversed now.
        Hide
        Sijie Guo added a comment -

        1. A publishes A1. A1 arrives B, but delayed at C.

        A: [A: 1, B: 0, C: 0]
        B: [A: 1, B: 0, C: 0]
        C: [A: 0, B: 0, C: 0]

        2. B publishes B1. B1 carries remote seq ids : [A: 1, B: 1, C: 0]. A & C's last seq id would be updated as:

        A: [A: 1, B: 1, C: 0]
        C: [A: 0, B: 1, C: 0]

        3. A1 arrives at C. C's last seq id would be updated as

        C: [A: 1, B: 1, C: 0]

        for C's subscriber, it would receive B1 as [A: 0, B: 1, C: 0](localseqid=1) and A1 as [A: 1, B: 1, C: 0](localseqid=2).

        the subscriber could be notified B1 is the first message from region B, and A1 is the first message from region A. I think it works correctly.

        Show
        Sijie Guo added a comment - 1. A publishes A1. A1 arrives B, but delayed at C. A: [A: 1, B: 0, C: 0] B: [A: 1, B: 0, C: 0] C: [A: 0, B: 0, C: 0] 2. B publishes B1. B1 carries remote seq ids : [A: 1, B: 1, C: 0] . A & C's last seq id would be updated as: A: [A: 1, B: 1, C: 0] C: [A: 0, B: 1, C: 0] 3. A1 arrives at C. C's last seq id would be updated as C: [A: 1, B: 1, C: 0] for C's subscriber, it would receive B1 as [A: 0, B: 1, C: 0] (localseqid=1) and A1 as [A: 1, B: 1, C: 0] (localseqid=2). the subscriber could be notified B1 is the first message from region B, and A1 is the first message from region A. I think it works correctly.
        Hide
        Yixue (Andrew) Zhu added a comment -

        The setMsgId() is only called in the code path of persistence. For local publishes, it looks that A:1 is not present in the vector. It makes sense since srcRegion + localseqId covered it.

        Here is my understanding of the sequence and state:
        1. A publishes A1. A1 arrives B, but delayed at C.
        A: [A: 1, B: 0, C: 0]
        B: [A: 1, B: 1, C: 0]
        C: [A: 0, B: 0, C: 0]

        Note that B:1 in B. From this point on, the state is described in my drawing earlier.
        I will verify it on Monday.

        Show
        Yixue (Andrew) Zhu added a comment - The setMsgId() is only called in the code path of persistence. For local publishes, it looks that A:1 is not present in the vector. It makes sense since srcRegion + localseqId covered it. Here is my understanding of the sequence and state: 1. A publishes A1. A1 arrives B, but delayed at C. A: [A: 1, B: 0, C: 0] B: [A: 1, B: 1, C: 0] C: [A: 0, B: 0, C: 0] Note that B:1 in B. From this point on, the state is described in my drawing earlier. I will verify it on Monday.
        Hide
        Sijie Guo added a comment -

        Ur are right. I re-org the example into a document describing the difference with original protocol and applying this patch. And I think the fix here is not reasonable. We might need a separated jira using standard vector clock algorithm to improve it.

        Show
        Sijie Guo added a comment - Ur are right. I re-org the example into a document describing the difference with original protocol and applying this patch. And I think the fix here is not reasonable. We might need a separated jira using standard vector clock algorithm to improve it.
        Hide
        Ivan Kelly added a comment -

        I'm moving this to 4.3.0 as there's no urgent need for this in 4.2.0.

        Show
        Ivan Kelly added a comment - I'm moving this to 4.3.0 as there's no urgent need for this in 4.2.0.
        Hide
        Sijie Guo added a comment -

        remove it from 4.3.0, as no urgent need in 4.3.0

        Show
        Sijie Guo added a comment - remove it from 4.3.0, as no urgent need in 4.3.0

          People

          • Assignee:
            Aniruddha
            Reporter:
            Aniruddha
          • Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:

              Development