Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 6.0
    • Component/s: modules/replicator
    • Labels:
      None
    • Lucene Fields:
      New

      Description

      Lucene's replication module makes it easy to incrementally sync index
      changes from a master index to any number of replicas, and it
      handles/abstracts all the underlying complexity of holding a
      time-expiring snapshot, finding which files need copying, syncing more
      than one index (e.g., taxo + index), etc.

      But today you must first commit on the master, and then again the
      replica's copied files are fsync'd, because the code operates on
      commit points. But this isn't "technically" necessary, and it mixes
      up durability and fast turnaround time.

      Long ago we added near-real-time readers to Lucene, for the same
      reason: you shouldn't have to commit just to see the new index
      changes.

      I think we should do the same for replication: allow the new segments
      to be copied out to replica(s), and new NRT readers to be opened, to
      fully decouple committing from visibility. This way apps can then
      separately choose when to replicate (for freshness), and when to
      commit (for durability).

      I think for some apps this could be a compelling alternative to the
      "re-index all documents on each shard" approach that Solr Cloud /
      ElasticSearch implement today, and it may also mean that the
      transaction log can remain external to / above the cluster.

      1. LUCENE-5438.patch
        344 kB
        Michael McCandless
      2. LUCENE-5438.patch
        173 kB
        Michael McCandless
      3. LUCENE-5438.patch
        46 kB
        Michael McCandless
      4. LUCENE-5438.patch
        29 kB
        Michael McCandless

        Activity

        Hide
        mikemccand Michael McCandless added a comment -

        Initial, very exploratory patch; it just contains a test case
        (TestNRTReplication), showing how NRT replication could work. It's
        not yet at all integrated with the replication module's existing
        APIs... and I'm not sure how to do that.

        But the test doesn't cheat, i.e. all index changes are pushed via
        byte[] / file copy from master to replica, and it does pass... though
        the CheckIndex that MDW.close calls is very slow, I think because of
        the term vector / postings cross checking.

        Flushed segments are "immediately" pushed to the replica; merged
        segments are first "warmed" by pre-copying to the replica with lower
        priority. I also created a simple ReferenceManager<IS> that does the
        reopen from a provided SegmentInfos, which the app on the replica side
        would use to obtain fresh searchers. From that point it can use
        SearcherLifetimeManager "as usual" to track/expire past searchers.

        Show
        mikemccand Michael McCandless added a comment - Initial, very exploratory patch; it just contains a test case (TestNRTReplication), showing how NRT replication could work. It's not yet at all integrated with the replication module's existing APIs... and I'm not sure how to do that. But the test doesn't cheat, i.e. all index changes are pushed via byte[] / file copy from master to replica, and it does pass... though the CheckIndex that MDW.close calls is very slow, I think because of the term vector / postings cross checking. Flushed segments are "immediately" pushed to the replica; merged segments are first "warmed" by pre-copying to the replica with lower priority. I also created a simple ReferenceManager<IS> that does the reopen from a provided SegmentInfos, which the app on the replica side would use to obtain fresh searchers. From that point it can use SearcherLifetimeManager "as usual" to track/expire past searchers.
        Hide
        markrmiller@gmail.com Mark Miller added a comment -

        Very interesting - can't wait to see how the performance works out.

        Trying to move Solr over to the replication module is something I've briefly thought about here and there - and then stopped like touching an electric fence It took so much work and effort to get the current replication code very stable with SolrCloud that I don't look forward to such a challenge in the near future.

        We would def like to have the ability to only index once. Of course, if you are sending documents to replicas async while indexing on the leader (we don't yet), I wonder how much benefit you get?

        Hopefully work like this gets some others interested in giving a replication overhaul a shot.

        Show
        markrmiller@gmail.com Mark Miller added a comment - Very interesting - can't wait to see how the performance works out. Trying to move Solr over to the replication module is something I've briefly thought about here and there - and then stopped like touching an electric fence It took so much work and effort to get the current replication code very stable with SolrCloud that I don't look forward to such a challenge in the near future. We would def like to have the ability to only index once. Of course, if you are sending documents to replicas async while indexing on the leader (we don't yet), I wonder how much benefit you get? Hopefully work like this gets some others interested in giving a replication overhaul a shot.
        Hide
        mikemccand Michael McCandless added a comment -

        New patch, improving the test case. I added some abstraction (Replica class), and the test now randomly commits/stops/starts replicas.

        Show
        mikemccand Michael McCandless added a comment - New patch, improving the test case. I added some abstraction (Replica class), and the test now randomly commits/stops/starts replicas.
        Hide
        shaie Shai Erera added a comment -

        I reviewed the patch and looks like it can roughly be divided to two:

        • Infrastructure: IndexWriter.flushAndIncRef(), SIS.write/read(DataOutput/Input), SDR being public...
        • Replication: all in the test, in the form of Replica and the various threads

        What we need is to figure out how to tie the replication stuff with the Replicator API. I thought about it, discussed a bit with Mike, and here's a proposal:

        • Implement NRTRevision which will list the NRT files (flushed segments) and the SIS-as-byte[] that we get from IW.flushAndIncRef().
        • Implement NRTIndexUpdateHandler (replica side) to act on the special revision, and use the SIS.read() API and SDR().
          • We might also need the ReferenceManager that exists in the patch, which acts upon a SIS, rather than looking up commit points.

        That will get NRT segments appear on the replicas easily, with some caveats:

        • The replicas will need to copy over merged segments, which can take a lot of time, and hurts the latency.
        • The replicas communicate w/ the primary node at their own leisure (well, configured interval), and for NRT we might want to notify the replicas of new files, to reduce latency.

        As for merged segments, the patch includes the foundation for it already in the form of MergedSegmentWarmer, which holds on to the merged segments until all replicas successfully copied it. That way replicas copy the big segments and only after all of them are done, the merged segment is exposed to the SIS on the primary side, and a new NRTRevision will list it. There are still issues to take care of, such as replicas failing etc., but it's impl details I think.

        To handle the push/pull issue we can offer another Replicator implementation - PushReplicator - which is given a list of replicas and every .publish() is communicated to all replicas so that they can start copying files immediately. Every replica will have an associated thread on the primary side to handle the replication logic and report on any failures that occurred with the replica. This will help w/ the bookkeeping that needs to be done on the primary side.

        These are all just preliminary thoughts, I'm sure there are fun gotchas .

        Show
        shaie Shai Erera added a comment - I reviewed the patch and looks like it can roughly be divided to two: Infrastructure: IndexWriter.flushAndIncRef(), SIS.write/read(DataOutput/Input), SDR being public... Replication: all in the test, in the form of Replica and the various threads What we need is to figure out how to tie the replication stuff with the Replicator API. I thought about it, discussed a bit with Mike, and here's a proposal: Implement NRTRevision which will list the NRT files (flushed segments) and the SIS-as-byte[] that we get from IW.flushAndIncRef(). Implement NRTIndexUpdateHandler (replica side) to act on the special revision, and use the SIS.read() API and SDR(). We might also need the ReferenceManager that exists in the patch, which acts upon a SIS, rather than looking up commit points. That will get NRT segments appear on the replicas easily, with some caveats: The replicas will need to copy over merged segments, which can take a lot of time, and hurts the latency. The replicas communicate w/ the primary node at their own leisure (well, configured interval), and for NRT we might want to notify the replicas of new files, to reduce latency. As for merged segments, the patch includes the foundation for it already in the form of MergedSegmentWarmer, which holds on to the merged segments until all replicas successfully copied it. That way replicas copy the big segments and only after all of them are done, the merged segment is exposed to the SIS on the primary side, and a new NRTRevision will list it. There are still issues to take care of, such as replicas failing etc., but it's impl details I think. To handle the push/pull issue we can offer another Replicator implementation - PushReplicator - which is given a list of replicas and every .publish() is communicated to all replicas so that they can start copying files immediately. Every replica will have an associated thread on the primary side to handle the replication logic and report on any failures that occurred with the replica. This will help w/ the bookkeeping that needs to be done on the primary side. These are all just preliminary thoughts, I'm sure there are fun gotchas .
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1569488 from Michael McCandless in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1569488 ]

        LUCENE-5438: make branch

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1569488 from Michael McCandless in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1569488 ] LUCENE-5438 : make branch
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1569489 from Michael McCandless in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1569489 ]

        LUCENE-5438: commit current state

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1569489 from Michael McCandless in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1569489 ] LUCENE-5438 : commit current state
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1569499 from Michael McCandless in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1569499 ]

        LUCENE-5438: fix javadoc

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1569499 from Michael McCandless in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1569499 ] LUCENE-5438 : fix javadoc
        Hide
        shaie Shai Erera added a comment -

        I committed NRTIndexRevision and matching test:

        • NRTIndexRevision lists the files in the SIS obtained from IW.flushAndIncrement() and holds a byte[] in memory of that SIS (by SIS.write(DataOutput)).
        • The segments_N is listed as segments_nrt_N, where N is SIS.getVersion() and in fact this file isn't materialized on any Directory, it's just listed so replica can request it.
        • There's a nocommit about how to handle commits, i.e. if you: addDoc(), commit(), addDoc(), publish(new NRTRev()), SIS.listFiles() contains segments_N as well
          • On one hand I think it's good to list that file as well, so that replica can replicate it and if it crashes, it can recover to the last known commit
          • It also simplifies how the app should integrate its NRT and commit() w/ replicator
          • But if we choose to pass that file as well, we should take care of it on the replica side, by e.g. sync'ing it (which we don't do for the in-memory SIS).

        I think it will be good if the framework allows the app to publish IndexRevision (commits) and NRTIndexRevision (NRT) seamlessly, so app can choose what to replicate. That way NRTIndexRevision doesn't need to know about SnapshotDeletionPolicy and its code path remains simple (IW.flushAndIncRef + IW.decRefDeleter). Once I add support on the replica side, I'll write a test which demonstrates how to mix commits/nrt w/ one replicator.

        Show
        shaie Shai Erera added a comment - I committed NRTIndexRevision and matching test: NRTIndexRevision lists the files in the SIS obtained from IW.flushAndIncrement() and holds a byte[] in memory of that SIS (by SIS.write(DataOutput) ). The segments_N is listed as segments_nrt_N, where N is SIS.getVersion() and in fact this file isn't materialized on any Directory, it's just listed so replica can request it. There's a nocommit about how to handle commits, i.e. if you: addDoc(), commit(), addDoc(), publish(new NRTRev()), SIS.listFiles() contains segments_N as well On one hand I think it's good to list that file as well, so that replica can replicate it and if it crashes, it can recover to the last known commit It also simplifies how the app should integrate its NRT and commit() w/ replicator But if we choose to pass that file as well, we should take care of it on the replica side, by e.g. sync'ing it (which we don't do for the in-memory SIS). I think it will be good if the framework allows the app to publish IndexRevision (commits) and NRTIndexRevision (NRT) seamlessly, so app can choose what to replicate. That way NRTIndexRevision doesn't need to know about SnapshotDeletionPolicy and its code path remains simple (IW.flushAndIncRef + IW.decRefDeleter). Once I add support on the replica side, I'll write a test which demonstrates how to mix commits/nrt w/ one replicator.
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1572653 from Michael McCandless in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1572653 ]

        LUCENE-5438: commit current [broken] state

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1572653 from Michael McCandless in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1572653 ] LUCENE-5438 : commit current [broken] state
        Hide
        mikemccand Michael McCandless added a comment -

        I've committed my current [broken] state here, but I'm gonna moth ball this for now.

        I had made the test case more evil, by adding randomly shutting down a master and moving it to another node (promoting a replica to master). It turns out this is very hard to do properly, because in this case, file names can be re-used (Lucene is no longer write-once) and detecting that is tricky, unless we can rely on some external global reliable storage (e.g. something stored in Zookeeper maybe) to record the last segments gen / segment name that was written on any node ...

        Show
        mikemccand Michael McCandless added a comment - I've committed my current [broken] state here, but I'm gonna moth ball this for now. I had made the test case more evil, by adding randomly shutting down a master and moving it to another node (promoting a replica to master). It turns out this is very hard to do properly, because in this case, file names can be re-used (Lucene is no longer write-once) and detecting that is tricky, unless we can rely on some external global reliable storage (e.g. something stored in Zookeeper maybe) to record the last segments gen / segment name that was written on any node ...
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1580522 from Michael McCandless in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1580522 ]

        LUCENE-5438: test seems to pass now, even when master is migrated to an 'old' replica

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1580522 from Michael McCandless in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1580522 ] LUCENE-5438 : test seems to pass now, even when master is migrated to an 'old' replica
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 1585682 from mikemccand@apache.org in branch 'dev/branches/lucene5438'
        [ https://svn.apache.org/r1585682 ]

        LUCENE-5438: checkpoint current [broken] state

        Show
        jira-bot ASF subversion and git services added a comment - Commit 1585682 from mikemccand@apache.org in branch 'dev/branches/lucene5438' [ https://svn.apache.org/r1585682 ] LUCENE-5438 : checkpoint current [broken] state
        Hide
        mikemccand Michael McCandless added a comment -

        Here's a new patch based on trunk: recent progress with IDs and
        checksums made things much easier here because now I can reliably
        identify files across nodes.

        This is still just a proof-of-concept test case and still has many
        nocommits, but I believe the core idea is finally working.

        The test randomly starts N nodes, each with its own filesystem
        directory. One node is primary, and the rest replicas. An indexing
        thread adds docs to the primary, and periodically a new NRT point is
        flushed and replicated out.

        The test is quite evil: most of MDW's checks are left enabled.
        E.g. virus checker sometimes prevents deletion of files, unref'd files
        at close are caught, double-write to same filename is caught (except
        after crash). It uses RIW. The replicas have random rate limiters so
        some nodes are fast to copy and others are slow. Replicas are
        randomly crashed or gracefully closed, and restarted. Primary is
        randomly crashed/closed and a replica is promoted as new primary.

        A file is considered the "same" across primary and replica if its file
        name is the same, its full byte[] index header and footer are
        identical, and it's in a "known to be fsync'd" state (e.g. not an
        unref'd file on init of replica/primary).

        A searching thread asserts that each searcher version always has the
        same hit count across all nodes, and that there is no data loss.

        I also added a simplistic/restricted transaction log (sits logically
        outside of/above the cluster, not per-node) to show how NRT points can
        be correlated back to locations in the xlog and used to replay
        indexing events on primary crash so no indexed documents are lost.

        Versions are consistent across replicas, so at any time if you have a
        follow-on search needing a specific searcher version, you can use any
        replica that has that version and it's guaranteed to be searching the
        same point-in-time.

        I would love to get this into Jenkins soon, but one big problem here
        is I had to open up all sorts of ridiculous APIs in IW/SegmentInfos
        ... I have to think about how to minimize this.

        Show
        mikemccand Michael McCandless added a comment - Here's a new patch based on trunk: recent progress with IDs and checksums made things much easier here because now I can reliably identify files across nodes. This is still just a proof-of-concept test case and still has many nocommits, but I believe the core idea is finally working. The test randomly starts N nodes, each with its own filesystem directory. One node is primary, and the rest replicas. An indexing thread adds docs to the primary, and periodically a new NRT point is flushed and replicated out. The test is quite evil: most of MDW's checks are left enabled. E.g. virus checker sometimes prevents deletion of files, unref'd files at close are caught, double-write to same filename is caught (except after crash). It uses RIW. The replicas have random rate limiters so some nodes are fast to copy and others are slow. Replicas are randomly crashed or gracefully closed, and restarted. Primary is randomly crashed/closed and a replica is promoted as new primary. A file is considered the "same" across primary and replica if its file name is the same, its full byte[] index header and footer are identical, and it's in a "known to be fsync'd" state (e.g. not an unref'd file on init of replica/primary). A searching thread asserts that each searcher version always has the same hit count across all nodes, and that there is no data loss. I also added a simplistic/restricted transaction log (sits logically outside of/above the cluster, not per-node) to show how NRT points can be correlated back to locations in the xlog and used to replay indexing events on primary crash so no indexed documents are lost. Versions are consistent across replicas, so at any time if you have a follow-on search needing a specific searcher version, you can use any replica that has that version and it's guaranteed to be searching the same point-in-time. I would love to get this into Jenkins soon, but one big problem here is I had to open up all sorts of ridiculous APIs in IW/SegmentInfos ... I have to think about how to minimize this.
        Hide
        mikemccand Michael McCandless added a comment -

        I got back to this issue and have been pushing changes here:

        https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;a=shortlog;h=refs/heads/jira/lucene-5438-nrt-replication

        Net/net I think it's in pretty good shape ... I'd like to add this for
        6.0 as an experimental feature, as an alternative to document based
        replication.

        I think there are complex tradeoffs between the two approaches to
        distributing Lucene, but I think it's important users at least have
        a choice.

        With this change, multiple nodes (primary and replicas) have
        essentially the same transactional semantics that a single Lucene
        IndexWriter + NRT readers offers. You have known point-in-time views
        that are the consistent (long version) across nodes, you can commit
        any node (primary or replica), rollback etc. When things crash, on
        restart they are back to the last commit, etc.

        The test cases are quite evil: they spawn sub-process JVMs, and
        communicate over a naive (thread-per-connection) TCP protocol to copy
        files, index documents, search, etc. And they randomly crash (thank
        you TestIndexWriterOnJRECrash!), commit, close, flip bits during
        file copy, simulate slow networks, etc.

        I'll make an applyable patch from the current branch and post here.

        Show
        mikemccand Michael McCandless added a comment - I got back to this issue and have been pushing changes here: https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;a=shortlog;h=refs/heads/jira/lucene-5438-nrt-replication Net/net I think it's in pretty good shape ... I'd like to add this for 6.0 as an experimental feature, as an alternative to document based replication. I think there are complex tradeoffs between the two approaches to distributing Lucene, but I think it's important users at least have a choice. With this change, multiple nodes (primary and replicas) have essentially the same transactional semantics that a single Lucene IndexWriter + NRT readers offers. You have known point-in-time views that are the consistent (long version) across nodes, you can commit any node (primary or replica), rollback etc. When things crash, on restart they are back to the last commit, etc. The test cases are quite evil: they spawn sub-process JVMs, and communicate over a naive (thread-per-connection) TCP protocol to copy files, index documents, search, etc. And they randomly crash (thank you TestIndexWriterOnJRECrash !), commit, close, flip bits during file copy, simulate slow networks, etc. I'll make an applyable patch from the current branch and post here.
        Hide
        mikemccand Michael McCandless added a comment -

        Here's the latest applyable patch from the branch. Tests patch but
        not yet precommit... I'll work on it.

        I tried to keep the core changes to a minimum (simplified vs previous
        iterations), but there were some additions that NRT replication needs,
        like asking IW to write deletes to disk on opening the NRT reader.
        The changes to SegmentInfos.java are not as scary as they look (just
        factoring out methods to read/write from IndexInput/Output too).

        I've marked the new APIs experimental or internal, and put all
        the new classes under o.a.l.replication.nrt.

        The important classes are PrimaryNode (you create this on the JVM
        that will index documents) and ReplicaNode (you create that on
        other JVMs to receive newly flushed files). They are both abstract:
        you must subclass and implement methods that actually do the work of
        moving files, etc. The tests do this using a simple TCP socket
        server.

        Both PrimaryNode and ReplicaNode expose a SearcherManager,
        which you use for searching. They both have commit methods.

        The primary node uses a merged segment warmer that pre-copies merged
        files before letting the local IW cutover. This way NRT latency isn't
        blocked by copying merged files out (normally). An alternative to
        this would be to have the replicas do their own merging, but I think
        that gets quite complex.

        Show
        mikemccand Michael McCandless added a comment - Here's the latest applyable patch from the branch. Tests patch but not yet precommit... I'll work on it. I tried to keep the core changes to a minimum (simplified vs previous iterations), but there were some additions that NRT replication needs, like asking IW to write deletes to disk on opening the NRT reader. The changes to SegmentInfos.java are not as scary as they look (just factoring out methods to read/write from IndexInput/Output too). I've marked the new APIs experimental or internal, and put all the new classes under o.a.l.replication.nrt. The important classes are PrimaryNode (you create this on the JVM that will index documents) and ReplicaNode (you create that on other JVMs to receive newly flushed files). They are both abstract: you must subclass and implement methods that actually do the work of moving files, etc. The tests do this using a simple TCP socket server. Both PrimaryNode and ReplicaNode expose a SearcherManager , which you use for searching. They both have commit methods. The primary node uses a merged segment warmer that pre-copies merged files before letting the local IW cutover. This way NRT latency isn't blocked by copying merged files out (normally). An alternative to this would be to have the replicas do their own merging, but I think that gets quite complex.
        Hide
        mikemccand Michael McCandless added a comment -
        Show
        mikemccand Michael McCandless added a comment - Hmm, I failed to insert LUCENE-5438 into the merge commit. Here's the commit: https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;a=commit;h=12b8721a44dbd1fbc7878fa37186c16cf6045401
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 3daa82c6595387fd1bc9d78c2d2d7660dfc8b4b6 in lucene-solr's branch refs/heads/branch_6x from Mike McCandless
        [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=3daa82c ]

        LUCENE-5438: make some APIs public so servers can actually use this feature

        Show
        jira-bot ASF subversion and git services added a comment - Commit 3daa82c6595387fd1bc9d78c2d2d7660dfc8b4b6 in lucene-solr's branch refs/heads/branch_6x from Mike McCandless [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=3daa82c ] LUCENE-5438 : make some APIs public so servers can actually use this feature
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 2d07ffd97fb4aec2a11aeddab40490044f3c2b49 in lucene-solr's branch refs/heads/master from Mike McCandless
        [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=2d07ffd ]

        LUCENE-5438: make some APIs public so servers can actually use this feature

        Show
        jira-bot ASF subversion and git services added a comment - Commit 2d07ffd97fb4aec2a11aeddab40490044f3c2b49 in lucene-solr's branch refs/heads/master from Mike McCandless [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=2d07ffd ] LUCENE-5438 : make some APIs public so servers can actually use this feature

          People

          • Assignee:
            mikemccand Michael McCandless
            Reporter:
            mikemccand Michael McCandless
          • Votes:
            2 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development