Uploaded image for project: 'Apache AsterixDB'
  1. Apache AsterixDB
  2. ASTERIXDB-1917

FLUSH_LSN for disk components is not correctly set when a NC holds multiple partitions

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      When we flush a memory component of an index, we would set an LSN to the result disk component. The LSN is set as the last operation which modifies that memory component. Thus, given an index, the FLUSH_LSNs of its flushed disk components should be increasing, i.e., later flushed components get larger LSNs.

      However, currently I observed a bug that later flushed disk components get a smaller LSN, which breaks this properly.

      A brief explanation of this bug is as follows. Suppose we have one dataset D, and two partitions on one NC. Suppose D only has a primary index P. Further suppose P1 and P2 are two partitioned indexes for the two partitions on this NC. This implies P1 and P2 share the same PrimaryIndexOperationTracker, and they would always be flushed together.

      The LSN for flushed disk components is ILSMIOOperationCallback. Now suppose an index has two memory components. ILSMIOOperationCallback maintains an array mutableLastLSNs of length 2 to track the FLUSH_LSN for two memory components. Before scheduling each flush operation, PrimaryIndexOperationTracker needs to call ILSMIOOperationCallback to set the FLUSH_LSN.

      Now consider the following scenario (which happens but very rarely). Initially,
      P1.mutableLastLSNs=[0,0]
      P2.mutableLastLSNs=[0,0]

      Suppose dataset D needs to be flushed, and mutableLastLSNs is set as follows:
      P1.mutableLastLSNs=[1, 0]
      P2.mutableLastLSNs=[1, 0]

      Then, suppose the flush operation of P2 is fast, and produces a disk component P2.d1 (P2.d1.LSN = 1). Data continues to come into P2, and it needs to be flushed again. However, P1 is still flushing the first memory component. Then mutableLastLSNs become:
      P1.mutableLastLSNs=[1, 2]
      P2.mutableLastLSNs=[1, 2].

      Surprisingly, the flush operation of P2 is again fast, and produces a disk component P2.d2 (P2.d2.LSN = 2). Still, data continues to come into P2, and it needs to be flushed again. But P1 is still flushing the first memory component. Then mutableLastLSNs become:
      P1.mutableLastLSNs=[3, 2]
      P2.mutableLastLSNs=[3, 2].

      At this time, P1 finishes its first flush operation, and produced the disk component P1.d1 (P1.d1.LSN = 3). This in incorrect, since P1.d1.LSN should be 1, not 3. Its original value is overwritten by the flush request of P2!

      To reproduce this bug, one needs to change the codebase slightly (i.e., LSMBTreeIOOperationCallback). I added a member variable

       private volatile long prevLSN = 0;
      

      and added one check in the getComponentLSN method:

       @Override
          public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
              if (diskComponents == null) {
                  // Implies a flush IO operation. --> moves the flush pointer
                  // Flush operation of an LSM index are executed sequentially.
                  synchronized (this) {
                      long lsn = mutableLastLSNs[readIndex];
                      if (!(prevLSN <= lsn)) {
                          throw new IllegalStateException();
                      }
                      prevLSN = lsn;
                      return lsn;
                  }
              }
              // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
              long maxLSN = -1L;
              for (ILSMComponent c : diskComponents) {
                  BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
                  maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btree), maxLSN);
              }
              return maxLSN;
          }
      

      Then, after starting AsterixDB using AsterixHyracksIntegrationUtil, you can ingest the data and reproduce the bug using the following queries (you need to replace the path_to_sample_data with the attached file):

      drop dataverse twitter if exists;
      create dataverse twitter if not exists;
      use dataverse twitter
      create type typeUser if not exists as open {
          id: int64,
          name: string,
          screen_name : string,
          lang : string,
          location: string,
          create_at: date,
          description: string,
          followers_count: int32,
          friends_count: int32,
          statues_count: int64
      }
      create type typePlace if not exists as open{
          country : string,
          country_code : string,
          full_name : string,
          id : string,
          name : string,
          place_type : string,
          bounding_box : rectangle
      }
      create type typeGeoTag if not exists as open {
          stateID: int32,
          stateName: string,
          countyID: int32,
          countyName: string,
          cityID: int32?,
          cityName: string?
      }
      create type typeTweet if not exists as open{
          create_at : datetime,
          id: int64,
          "text": string,
          in_reply_to_status : int64,
          in_reply_to_user : int64,
          favorite_count : int64,
          coordinate: point?,
          retweet_count : int64,
          lang : string,
          is_retweet: boolean,
          hashtags : {{ string }} ?,
          user_mentions : {{ int64 }} ? ,
          user : typeUser,
          place : typePlace?,
          geo_tag: typeGeoTag
      }
      create dataset ds_tweet(typeTweet) if not exists primary key id 
      using compaction policy correlated-prefix (("max-mergable-component-size"="134217728"),("max-tolerance-component-count"="5")) with filter on create_at ;
      // with filter on create_at;
      //"using" "compaction" "policy" CompactionPolicy ( Configuration )? )?
      
      create feed TweetFeed using localfs
      (
          ("path"="localhost:///path_to_sample_data"),
          ("address-type"="nc"),
          ("type-name"="typeTweet"),
          ("format"="adm")
      );
      connect feed TweetFeed to dataset ds_tweet;
      start feed TweetFeed;
      

      I attached the asterix-build-configuration-lsm.xml file and the sample data file as below.

        Attachments

        1. asterix-build-configuration-lsm.xml
          4 kB
          Chen Luo
        2. sample.zip
          22.54 MB
          Chen Luo

          Activity

            People

            • Assignee:
              luochen01 Chen Luo
              Reporter:
              luochen01 Chen Luo
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: