Uploaded image for project: 'Bookkeeper'
  1. Bookkeeper
  2. BOOKKEEPER-439

No more messages delivered after deleted consumed ledgers.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 4.1.0, 4.2.0
    • 4.2.0
    • hedwig-server
    • None

    Description

      We encountered exception as below:

      2012-10-18 09:27:27,248 - DEBUG [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk read for ledger: L2 from entry-id: 100 to entry-id: 103
      2012-10-18 09:27:27,248 - ERROR [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while reading from ledger: L2 for topic: TOPIC
      org.apache.bookkeeper.client.BKException$BKReadException
      at org.apache.bookkeeper.client.BKException.create(BKException.java:48)
      at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
      at org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
      at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
      at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
      at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
      at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
      at org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
      at org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
      at org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
      at org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
      at org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
      at java.lang.Thread.run(Thread.java:662)

      topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.

      1) all the 100 entries in L1 has been delivered and consumed.
      2) 100 entries have been wrote to L2 but not delivered.
      3) L1 is deleted since all its entries have been consumed.
      4) hub server shuts down
      5) TOPIC recovered L2 and started delivering from 101.

      TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was issued from the exception log, so no entries would be expected to read from L2 at [100-103].

      The problem of this issue is that we used 0 and 1 for the start of message id and ledger id even we had some consumed ledgers deleted.

              void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
                  Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
                  TopicInfo topicInfo = new TopicInfo();
      
                  long startOfLedger = 1;
      
                  while (lrIterator.hasNext()) {
                      LedgerRange range = lrIterator.next();
      
                      if (range.hasEndSeqIdIncluded()) {
                          // this means it was a valid and completely closed ledger
                          long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
                          topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range,           startOfLedger));                             startOfLedger = endOfLedger + 1;
                          continue;
                      }        
      
                      // If it doesn't have a valid end, it must be the last ledger
                      if (lrIterator.hasNext()) {
                          String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.       toStringUtf8()                                            + " is not the last one but still does not have an end seq-id";
                          logger.error(msg);
                          cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
                          return;                }
      
                      // The last ledger does not have a valid seq-id, lets try to
                      // find it out
                      recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
                      return;
                  }
      
                                  long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.   ledgerRanges
                                                       .lastKey();
                                  LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
                                                   .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
                                  topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
                                          new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
      

      Attachments

        1. TopicMetadataAddStartSeqId.patch
          45 kB
          Yixue Zhu
        2. BOOKKEEPER-439.diff
          47 kB
          Sijie Guo
        3. BOOKKEEPER-439.diff
          62 kB
          Sijie Guo
        4. BOOKKEEPER-439.diff
          73 kB
          Sijie Guo
        5. BOOKKEEPER-439.diff
          73 kB
          Sijie Guo

        Issue Links

          Activity

            People

              hustlmsp Sijie Guo
              hustlmsp Sijie Guo
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: