Details

      Description

      Currently BookKeeperAdmin will copy the entries parallel.

      This may create more load on the servers. To avoid that, we can refactor the BKAdmin code to copy the entries sequential.

      1. BOOKKEEPER-315.patch
        22 kB
        Uma Maheswara Rao G
      2. BOOKKEEPER-315.patch
        27 kB
        Uma Maheswara Rao G
      3. BOOKKEEPER-315.patch
        27 kB
        Uma Maheswara Rao G
      4. BOOKKEEPER-315.patch
        29 kB
        Uma Maheswara Rao G

        Issue Links

          Activity

          Hide
          Hudson added a comment -

          Integrated in bookkeeper-trunk #742 (See https://builds.apache.org/job/bookkeeper-trunk/742/)
          BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank) (Revision 1394542)

          Result = UNSTABLE
          ivank :
          Files :

          • /zookeeper/bookkeeper/trunk/CHANGES.txt
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
          • /zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
          Show
          Hudson added a comment - Integrated in bookkeeper-trunk #742 (See https://builds.apache.org/job/bookkeeper-trunk/742/ ) BOOKKEEPER-315 : Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank) (Revision 1394542) Result = UNSTABLE ivank : Files : /zookeeper/bookkeeper/trunk/CHANGES.txt /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java /zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
          Hide
          Ivan Kelly added a comment -

          Committed as r1394542. Good work Uma

          Show
          Ivan Kelly added a comment - Committed as r1394542. Good work Uma
          Hide
          Uma Maheswara Rao G added a comment -

          Thanks a lot, Ivan for the review!
          I have attached a patch which addresses your comments.

          Show
          Uma Maheswara Rao G added a comment - Thanks a lot, Ivan for the review! I have attached a patch which addresses your comments.
          Hide
          Ivan Kelly added a comment -

          Sorry about taking so long to look at this.

          one clarification: are you suggesting something like below?

          private void replicateBatch(LedgerHandle lh, LedgerFragment lf,
                      InetSocketAddress targetBookieAddress) throws InterruptedException {
                  final SyncCounter syncCounter = new SyncCounter();
                  ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
                  syncCounter.inc();
                  replicate(lh, lf, resultCallBack, targetBookieAddress);
                  syncCounter.block(0);
              }
          

          replicateBatch, replicate are like, one will be sync call and other is async?

          or you are suggesting that replicateBatch itself can be async?

          In fact, it would be better to have a LedgerFragmentReplicator#replicateNextBatch rather than replicateBatch which works on a iterator of fragments.

          There's no need for a sync #replicate method, though the current #replicate impl should be moved to #replicateFragmentInternal and the others should be something like the following.

          void replicate(LedgerHandle lh, final LedgerFragment ledgerFragment,
                         VoidCallback ledgerFragmentCb, InetSocketAddress addr) {
              Set<LedgerFragment> fragments = splitIntoSubFragment(ledgerFragment, lh, batchSize);
              replicateNextBatch(lh, fragments, ledgerFragmentCb);
          }
          
          void replicateNextBatch(LedgerHandle lh, Iterator<LedgerFragment> fragments,
                                  VoidCallback finishedCallback, InetSocketAddress addr) {
              if (fragments.hasNext()) {
                  replicateFragmentInternal(lh, fragments.next(), addr,
                       new VoidCallback() {
                           public void operationComplete(int rc, Void v, Object ctx) {
                               if (rc != OK) {
                                   finishedCallback.operationComplete(rc, null, null);
                               } else {
                                   replicateNextBatch(lh, fragments, finishedCallback, addr);
                               }
                           }
                       });
              } else {
                  finishedCallback.operationComplete(OK, null, null);
              }
          }
          
          private void replicateFragmentInternal(LedgerHandle lh, LedgerFragment fragment,
                                                 InetSocketAddress addr, VoidCallback cb) {
              ....
          }
          

          Also, the order of the parameters in splitIntoSubFragment is inconsistent with replicate(), they should be the same

          Show
          Ivan Kelly added a comment - Sorry about taking so long to look at this. one clarification: are you suggesting something like below? private void replicateBatch(LedgerHandle lh, LedgerFragment lf, InetSocketAddress targetBookieAddress) throws InterruptedException { final SyncCounter syncCounter = new SyncCounter(); ResultCallBack resultCallBack = new ResultCallBack(syncCounter); syncCounter.inc(); replicate(lh, lf, resultCallBack, targetBookieAddress); syncCounter.block(0); } replicateBatch, replicate are like, one will be sync call and other is async? or you are suggesting that replicateBatch itself can be async? In fact, it would be better to have a LedgerFragmentReplicator#replicateNextBatch rather than replicateBatch which works on a iterator of fragments. There's no need for a sync #replicate method, though the current #replicate impl should be moved to #replicateFragmentInternal and the others should be something like the following. void replicate(LedgerHandle lh, final LedgerFragment ledgerFragment, VoidCallback ledgerFragmentCb, InetSocketAddress addr) { Set<LedgerFragment> fragments = splitIntoSubFragment(ledgerFragment, lh, batchSize); replicateNextBatch(lh, fragments, ledgerFragmentCb); } void replicateNextBatch(LedgerHandle lh, Iterator<LedgerFragment> fragments, VoidCallback finishedCallback, InetSocketAddress addr) { if (fragments.hasNext()) { replicateFragmentInternal(lh, fragments.next(), addr, new VoidCallback() { public void operationComplete( int rc, Void v, Object ctx) { if (rc != OK) { finishedCallback.operationComplete(rc, null , null ); } else { replicateNextBatch(lh, fragments, finishedCallback, addr); } } }); } else { finishedCallback.operationComplete(OK, null , null ); } } private void replicateFragmentInternal(LedgerHandle lh, LedgerFragment fragment, InetSocketAddress addr, VoidCallback cb) { .... } Also, the order of the parameters in splitIntoSubFragment is inconsistent with replicate(), they should be the same
          Hide
          Uma Maheswara Rao G added a comment -

          Anyway I addressed your comments, please take a look at new patch.
          Thanks for the reviews!

          Show
          Uma Maheswara Rao G added a comment - Anyway I addressed your comments, please take a look at new patch. Thanks for the reviews!
          Hide
          Uma Maheswara Rao G added a comment -

          Thanks a lot Ivan, for the comments.

          one clarification: are you suggesting something like below?

          private void replicateBatch(LedgerHandle lh, LedgerFragment lf,
                      InetSocketAddress targetBookieAddress) throws InterruptedException {
                  final SyncCounter syncCounter = new SyncCounter();
                  ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
                  syncCounter.inc();
                  replicate(lh, lf, resultCallBack, targetBookieAddress);
                  syncCounter.block(0);
              }
          

          replicateBatch, replicate are like, one will be sync call and other is async?

          or you are suggesting that replicateBatch itself can be async?

          Could you please clarify on this?

          Thanks,
          Uma

          Show
          Uma Maheswara Rao G added a comment - Thanks a lot Ivan, for the comments. one clarification: are you suggesting something like below? private void replicateBatch(LedgerHandle lh, LedgerFragment lf, InetSocketAddress targetBookieAddress) throws InterruptedException { final SyncCounter syncCounter = new SyncCounter(); ResultCallBack resultCallBack = new ResultCallBack(syncCounter); syncCounter.inc(); replicate(lh, lf, resultCallBack, targetBookieAddress); syncCounter.block(0); } replicateBatch, replicate are like, one will be sync call and other is async? or you are suggesting that replicateBatch itself can be async? Could you please clarify on this? Thanks, Uma
          Hide
          Uma Maheswara Rao G added a comment -

          Thanks a lot Ivan, for the comments.

          one clarification: are you suggesting something like below?

          private void replicateBatch(LedgerHandle lh, LedgerFragment lf,
                      InetSocketAddress targetBookieAddress) throws InterruptedException {
                  final SyncCounter syncCounter = new SyncCounter();
                  ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
                  syncCounter.inc();
                  replicate(lh, lf, resultCallBack, targetBookieAddress);
                  syncCounter.block(0);
              }
          

          replicateBatch, replicate are like, one will be sync call and other is async?

          or you are suggesting that replicateBatch itself can be async?

          Could you please clarify on this?

          Thanks,
          Uma

          Show
          Uma Maheswara Rao G added a comment - Thanks a lot Ivan, for the comments. one clarification: are you suggesting something like below? private void replicateBatch(LedgerHandle lh, LedgerFragment lf, InetSocketAddress targetBookieAddress) throws InterruptedException { final SyncCounter syncCounter = new SyncCounter(); ResultCallBack resultCallBack = new ResultCallBack(syncCounter); syncCounter.inc(); replicate(lh, lf, resultCallBack, targetBookieAddress); syncCounter.block(0); } replicateBatch, replicate are like, one will be sync call and other is async? or you are suggesting that replicateBatch itself can be async? Could you please clarify on this? Thanks, Uma
          Hide
          Ivan Kelly added a comment -

          Hi Uma,
          In ReplicationWorker, the setEntries.. is redundant, as it should have already been copied in the constructor in the first line.

          +        ClientConfiguration clientConf = new ClientConfiguration(conf);
          +        clientConf.setEntriesPerSubFragmentInRereplication(this.conf
          +                .getEntriesPerSubFragmentInRereplication());
          
          ENTRIES_PER_SUB_FRAGMENT_IN_REREPLICATION = "entriesPerSubFragmentInRereplication";

          is too long/wordy.

          REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize"

          is clearer. Also, we don't want to expose the concept of fragments/subfragments to the users. The less high level concepts we expose, the easier it is for them to reason about what bookkeeper is doing.

          Also, there's an inconsistency as to when splitting it occurring. Splitting should happen for every fragment above the batch size. So the only entry point to LedgerFragmentReplicator should be #replicate, and it should do the splitting and replicating. You can pretty much do this by renaming #splitAndReplicateFragment to #replicate, and #replicate to #replicateBatch, but #replicate should be an async method still.

          Show
          Ivan Kelly added a comment - Hi Uma, In ReplicationWorker, the setEntries.. is redundant, as it should have already been copied in the constructor in the first line. + ClientConfiguration clientConf = new ClientConfiguration(conf); + clientConf.setEntriesPerSubFragmentInRereplication( this .conf + .getEntriesPerSubFragmentInRereplication()); ENTRIES_PER_SUB_FRAGMENT_IN_REREPLICATION = "entriesPerSubFragmentInRereplication" ; is too long/wordy. REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize" is clearer. Also, we don't want to expose the concept of fragments/subfragments to the users. The less high level concepts we expose, the easier it is for them to reason about what bookkeeper is doing. Also, there's an inconsistency as to when splitting it occurring. Splitting should happen for every fragment above the batch size. So the only entry point to LedgerFragmentReplicator should be #replicate, and it should do the splitting and replicating. You can pretty much do this by renaming #splitAndReplicateFragment to #replicate, and #replicate to #replicateBatch, but #replicate should be an async method still.
          Hide
          Uma Maheswara Rao G added a comment -

          Hi Ivan, Could you please take a look on this latest patch?

          Show
          Uma Maheswara Rao G added a comment - Hi Ivan, Could you please take a look on this latest patch?
          Hide
          Uma Maheswara Rao G added a comment -

          Hi Ivan,

          + long numberOfEntriesToReplicate = lastEntryId - firstEntryId;
          should be (lastEntryId - firstEntryId) + 1

          You are right, both side entries has to be considered. Thank a lot for noticing it.

          In general, I think this splitting should be in LedgerFragmentReplicator, rather than BookKeeperAdmin. This way you could avoid the need to expose the update of ledger metadata. It will make control flow a little more complex as you'll have to initiate replication on the next subfragment from the callback of the current, but i think it would encapsulate things a bit better.

          Yes, in general, all the split logic should be in LFR. I might pushed that code to BKAdmin because, I don't want to disturb admin key flow much, which will go through LFR.
          Now anyway, I moved that logic, still not disturbing BKAdmin key flows in anyway.

          Attached a patch, which addresses the comments.

          Also to give more confidence on split fragment scenarios, I have added multiple boundary cases for FragmentSplit.

          Thanks a lot, Ivan for your nice reviews!

          Show
          Uma Maheswara Rao G added a comment - Hi Ivan, + long numberOfEntriesToReplicate = lastEntryId - firstEntryId; should be (lastEntryId - firstEntryId) + 1 You are right, both side entries has to be considered. Thank a lot for noticing it. In general, I think this splitting should be in LedgerFragmentReplicator, rather than BookKeeperAdmin. This way you could avoid the need to expose the update of ledger metadata. It will make control flow a little more complex as you'll have to initiate replication on the next subfragment from the callback of the current, but i think it would encapsulate things a bit better. Yes, in general, all the split logic should be in LFR. I might pushed that code to BKAdmin because, I don't want to disturb admin key flow much, which will go through LFR. Now anyway, I moved that logic, still not disturbing BKAdmin key flows in anyway. Attached a patch, which addresses the comments. Also to give more confidence on split fragment scenarios, I have added multiple boundary cases for FragmentSplit. Thanks a lot, Ivan for your nice reviews!
          Hide
          Ivan Kelly added a comment -

          splitToSubFragments is incorrect.
          + long numberOfEntriesToReplicate = lastEntryId - firstEntryId;
          should be (lastEntryId - firstEntryId) + 1

          typo in maxAllowedEntriesInLogicalFrgament. name is too long anyhow. entriesPerSubFragment is better

          getMaxAllowedEtriesPerFragmentInRereplication has a typo and is too long. Change to getReplicationBatchSize()

          In general, I think this splitting should be in LedgerFragmentReplicator, rather than BookKeeperAdmin. This way you could avoid the need to expose the update of ledger metadata. It will make control flow a little more complex as you'll have to initiate replication on the next subfragment from the callback of the current, but i think it would encapsulate things a bit better.

          Show
          Ivan Kelly added a comment - splitToSubFragments is incorrect. + long numberOfEntriesToReplicate = lastEntryId - firstEntryId; should be (lastEntryId - firstEntryId) + 1 typo in maxAllowedEntriesInLogicalFrgament. name is too long anyhow. entriesPerSubFragment is better getMaxAllowedEtriesPerFragmentInRereplication has a typo and is too long. Change to getReplicationBatchSize() In general, I think this splitting should be in LedgerFragmentReplicator, rather than BookKeeperAdmin. This way you could avoid the need to expose the update of ledger metadata. It will make control flow a little more complex as you'll have to initiate replication on the next subfragment from the callback of the current, but i think it would encapsulate things a bit better.
          Hide
          Uma Maheswara Rao G added a comment -

          Attached a patch with the batching approach with out modifying much to BKAdmin older functionality side.
          Whole refactoring also I did, But it turns to me that huge changes. So, I felt, that should not be the target now, as we concerned mainly about auto replication part as that will run automatically and should not put more load on other servers due replication work.

          Also added a config item maxAllowedEntriesPerFragmentInRereplication (can suggest better name)
          Added 2 tests to check that this changes does not effect any fuctionality.

          Please take a look and give me your opinion on this.

          Thanks
          Uma

          Show
          Uma Maheswara Rao G added a comment - Attached a patch with the batching approach with out modifying much to BKAdmin older functionality side. Whole refactoring also I did, But it turns to me that huge changes. So, I felt, that should not be the target now, as we concerned mainly about auto replication part as that will run automatically and should not put more load on other servers due replication work. Also added a config item maxAllowedEntriesPerFragmentInRereplication (can suggest better name) Added 2 tests to check that this changes does not effect any fuctionality. Please take a look and give me your opinion on this. Thanks Uma
          Hide
          Uma Maheswara Rao G added a comment -

          I will make this to work for Auto replication part.
          What I am thinking is that, as Ivan suggested above, if ledger fragment has more entries in it, then I can partition that fragment into smaller fragments say each one contains 100/configured entries and then replicate all that fragments. Once all such fragments replicated, then update the ensemble info by pointing to new bookie.

          Show
          Uma Maheswara Rao G added a comment - I will make this to work for Auto replication part. What I am thinking is that, as Ivan suggested above, if ledger fragment has more entries in it, then I can partition that fragment into smaller fragments say each one contains 100/configured entries and then replicate all that fragments. Once all such fragments replicated, then update the ensemble info by pointing to new bookie.
          Hide
          Uma Maheswara Rao G added a comment -

          I got your concern, Even though we are replicating fragment by fragment, ledger may have single fragment and can contain with many millions of entries. Let me take a look for ensuring at least for autoreplication part, as that is important to it.

          Show
          Uma Maheswara Rao G added a comment - I got your concern, Even though we are replicating fragment by fragment, ledger may have single fragment and can contain with many millions of entries. Let me take a look for ensuring at least for autoreplication part, as that is important to it.
          Hide
          Ivan Kelly added a comment -

          Currently, when we recover the entries from a bookie, we basically send all the read requests to the other bookies at the same time. While this is fine for small bookies, I think this could fall down for ledgers with many millions of entries. So really, we should only read in batches of limited size.

          Show
          Ivan Kelly added a comment - Currently, when we recover the entries from a bookie, we basically send all the read requests to the other bookies at the same time. While this is fine for small bookies, I think this could fall down for ledgers with many millions of entries. So really, we should only read in batches of limited size.
          Hide
          Uma Maheswara Rao G added a comment -

          Once we see the real overhead on Replication worker nodes, will start work on this.

          Show
          Uma Maheswara Rao G added a comment - Once we see the real overhead on Replication worker nodes, will start work on this.
          Hide
          Uma Maheswara Rao G added a comment -

          Seems like, for this JIRA, changes required to change from BookKeeperAdmin, nothing to do with auto-recovery code.
          Marking it to top level issue.

          Show
          Uma Maheswara Rao G added a comment - Seems like, for this JIRA, changes required to change from BookKeeperAdmin, nothing to do with auto-recovery code. Marking it to top level issue.
          Hide
          Uma Maheswara Rao G added a comment -

          Actually this point is a Review comment in BOOKKEEPER-299 from Ivan.
          See my analysis comment I am thinking currently, Call backs can be there as a chain from Ledger to Frgment and Feragment to Entry. At the stage of entry we can think for making it one by one.

          Show
          Uma Maheswara Rao G added a comment - Actually this point is a Review comment in BOOKKEEPER-299 from Ivan. See my analysis comment I am thinking currently, Call backs can be there as a chain from Ledger to Frgment and Feragment to Entry. At the stage of entry we can think for making it one by one.
          Hide
          Flavio Junqueira added a comment -

          Hi Uma, Sequentially as in a chain? Or sequentially meaning one at a time?

          Show
          Flavio Junqueira added a comment - Hi Uma, Sequentially as in a chain? Or sequentially meaning one at a time?

            People

            • Assignee:
              Uma Maheswara Rao G
              Reporter:
              Uma Maheswara Rao G
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development