Lucene - Core
  1. Lucene - Core
  2. LUCENE-1336

Distributed Lucene using Hadoop RPC based RMI with dynamic classloading

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Won't Fix
    • Affects Version/s: 2.3.1
    • Fix Version/s: None
    • Component/s: modules/other
    • Labels:
      None
    • Lucene Fields:
      New, Patch Available

      Description

      Hadoop RPC based RMI system for use with Lucene Searchable. Keeps the application logic on the client side with removing the need to deploy application logic to the Lucene servers. Removes the need to provision new code to potentially hundreds of servers for every application logic change.

      The use case is any deployment requiring Lucene on many servers. This system provides the added advantage of allowing custom Query and Filter classes (or other classes) to be defined on for example a development machine and executed on the server without deploying the custom classes to the servers first. This can save a lot of time and effort in provisioning, restarting processes. In the future this patch will include an IndexWriterService interface which will enable document indexing. This will allow subclasses of Analyzer to be dynamically loaded onto a server as documents are added by the client.

      Hadoop RPC is more scalable than Sun's RMI implementation because it uses non blocking sockets. Hadoop RPC is also far easier to understand and customize if needed as it is embodied in 2 main class files org.apache.hadoop.ipc.Client and org.apache.hadoop.ipc.Server.

      Features include automatic dynamic classloading. The dynamic classloading enables newly compiled client classes inheriting core objects such as Query or Filter to be used to query the server without first deploying the code to the server.

      Using RMI dynamic classloading is not used in practice because it is hard to setup, requiring placing the new code in jar files on a web server on the client. Then requires custom system properties to be setup as well as Java security manager configuration.

      The dynamic classloading in Hadoop RMI for Lucene uses RMI to load the classes. Custom serialization and deserialization manages the classes and the class versions on the server and client side. New class files are automatically detected and loaded using ClassLoader.getResourceAsStream and so this system does not require creating a JAR file. The use of the same networking system used for the remote method invocation is used for the loading classes over the network. This removes the necessity of a separate web server dedicated to the task and makes deployment a few lines of code.

      1. lucene-1336.patch
        177 kB
        Jason Rutherglen
      2. lucene-1336.patch
        139 kB
        Jason Rutherglen
      3. lucene-1336.patch
        54 kB
        Jason Rutherglen

        Activity

        Hide
        Jason Rutherglen added a comment -

        lucene-1336.patch

        Depends on commons-io-1.3.2.jar, hadoop-0.17.1-core.jar

        Note about the class loading. Core classes from Lucene are excluded from dynamic loading as they are assumed to exist on the server. These are defined in LuceneClasses.

        Test requires executing TestRMIServer before TestRMIClient

        Test case creates Query subclass that returns a random string from Query.toString(String field). A TestService.search(Query) method is called on TestRMIServer which returns the random string to the client. A second Query subclass of the same name as the first is compiled with a different random string. The same call is made and the returned strings are different. This means the method call worked and the new Query subclass was dynamically loaded.

        Todo: Add test case using Searchable, create IndexWriterService, improve remote exception handling.

        Show
        Jason Rutherglen added a comment - lucene-1336.patch Depends on commons-io-1.3.2.jar, hadoop-0.17.1-core.jar Note about the class loading. Core classes from Lucene are excluded from dynamic loading as they are assumed to exist on the server. These are defined in LuceneClasses. Test requires executing TestRMIServer before TestRMIClient Test case creates Query subclass that returns a random string from Query.toString(String field). A TestService.search(Query) method is called on TestRMIServer which returns the random string to the client. A second Query subclass of the same name as the first is compiled with a different random string. The same call is made and the returned strings are different. This means the method call worked and the new Query subclass was dynamically loaded. Todo: Add test case using Searchable, create IndexWriterService, improve remote exception handling.
        Hide
        Jason Rutherglen added a comment -

        lucene-1336.patch

        • Added Lucene specific classes that allow creating indexes, updating indexes, searching
        • Distributed garbage collection of objects on the server using leases. Used with Searchables that are no longer
          referenced on the client. The server keeps track of references to the server object (if desired) and cleans up when the count == 0.
        • Analyzer is Serializable
        • TestLuceneServer, TestLuceneClient test cases

        Todo:

        • Add to LuceneClient using distributed events (callbacks) notification of new IndexReader on the server. This way all interested parties always have a remote reference to the latest index version.

        Some of the interfaces:

        public interface IndexService extends Remote {
          public static class IndexVersion implements Serializable {
            public static final long serialVersionUID = 1l;
            public long generation;
          }
          
          public static interface Operation {
          }
          
          public static class Add implements Serializable, Operation {
            public static final long serialVersionUID = 1l;
            public Analyzer analyzer;
            public Document document;
          }
          
          public static class Update implements Serializable, Operation {
            public static final long serialVersionUID = 1l;
            public Document document;
            public Term term;
            public Analyzer analyzer;
          }
          
          public static class Delete implements Serializable, Operation {
            public static final long serialVersionUID = 1l;
            public Query query;
            public Term term;
          }
          
          public SearchableService reopen() throws Exception;
          public void close() throws Exception;
          public IndexInfo getIndexInfo() throws Exception;
          
          /**
           * Executes batch of index changing operations (add, update, or delete) 
           * @param operations
           * @throws Exception
           */
          public void execute(Operation[] operations) throws Exception;
          public void addDocument(Document document, Analyzer analyzer) throws Exception;
          public void updateDocument(Term term, Document document, Analyzer analyzer) throws Exception;
          public void deleteDocuments(Term term) throws Exception;
          public void deleteDocuments(Query query) throws Exception;
          public void flush() throws Exception;
        }
        
        public interface SearchableService extends Searchable {
          public IndexVersion getIndexVersion() throws Exception;
          public Document[] docs(int[] docs, FieldSelector fieldSelector) throws CorruptIndexException, IOException;
        }
        
        public interface IndexManagerService {
          
          public static class IndexInfo implements Serializable {
            public static final long serialVersionUID = 1l;
            public String name;
            public String serviceName;
            public long length;
            public IndexSettings indexSettings;
          }
          
          public static class IndexSettings implements Serializable {
            public static final long serialVersionUID = 1l;
            public Analyzer defaultAnalyzer;
            public int maxFieldLength;
            public Double ramBufferSizeMB;
          }
        
          public IndexService createIndex(String name, IndexSettings indexSettings) throws Exception;
          public IndexInfo[] getIndexInfos() throws Exception;
          public void deleteIndex(String name) throws Exception;
        }
        
        Show
        Jason Rutherglen added a comment - lucene-1336.patch Added Lucene specific classes that allow creating indexes, updating indexes, searching Distributed garbage collection of objects on the server using leases. Used with Searchables that are no longer referenced on the client. The server keeps track of references to the server object (if desired) and cleans up when the count == 0. Analyzer is Serializable TestLuceneServer, TestLuceneClient test cases Todo: Add to LuceneClient using distributed events (callbacks) notification of new IndexReader on the server. This way all interested parties always have a remote reference to the latest index version. Some of the interfaces: public interface IndexService extends Remote { public static class IndexVersion implements Serializable { public static final long serialVersionUID = 1l; public long generation; } public static interface Operation { } public static class Add implements Serializable, Operation { public static final long serialVersionUID = 1l; public Analyzer analyzer; public Document document; } public static class Update implements Serializable, Operation { public static final long serialVersionUID = 1l; public Document document; public Term term; public Analyzer analyzer; } public static class Delete implements Serializable, Operation { public static final long serialVersionUID = 1l; public Query query; public Term term; } public SearchableService reopen() throws Exception; public void close() throws Exception; public IndexInfo getIndexInfo() throws Exception; /** * Executes batch of index changing operations (add, update, or delete) * @param operations * @ throws Exception */ public void execute(Operation[] operations) throws Exception; public void addDocument(Document document, Analyzer analyzer) throws Exception; public void updateDocument(Term term, Document document, Analyzer analyzer) throws Exception; public void deleteDocuments(Term term) throws Exception; public void deleteDocuments(Query query) throws Exception; public void flush() throws Exception; } public interface SearchableService extends Searchable { public IndexVersion getIndexVersion() throws Exception; public Document[] docs( int [] docs, FieldSelector fieldSelector) throws CorruptIndexException, IOException; } public interface IndexManagerService { public static class IndexInfo implements Serializable { public static final long serialVersionUID = 1l; public String name; public String serviceName; public long length; public IndexSettings indexSettings; } public static class IndexSettings implements Serializable { public static final long serialVersionUID = 1l; public Analyzer defaultAnalyzer; public int maxFieldLength; public Double ramBufferSizeMB; } public IndexService createIndex( String name, IndexSettings indexSettings) throws Exception; public IndexInfo[] getIndexInfos() throws Exception; public void deleteIndex( String name) throws Exception; }
        Hide
        Jason Rutherglen added a comment -

        lucene-1336.patch

        • HMAC based security authentication between client and server. This was chosen as it is fairly simple to use and is more secure than username password. Public/private keys signing, encryption can be used as well via the RMISecurity interface. SSL may also be used at the socket layer, though that would require work in the Hadoop RPC NIO socket code.
        • LuceneMultiClient class that allows searching over multiple remote indexes via a MultiSearcher. Class also manages obtaining the latest Searchables via the registered IndexListener.
        • Distributed events for new Searchables on a remote LuceneServer reopen. LuceneClient always has the most up to date Searchable automatically. Added IndexService.registerIndexListener method.
        • Apache License headers
        • IndexService.flushAndReopen method flushes indexes changes from the IndexWriter, reopens, and returns the latest Searchable.

        Future:

        • Facet interface with default Term and Query implementations.
        Show
        Jason Rutherglen added a comment - lucene-1336.patch HMAC based security authentication between client and server. This was chosen as it is fairly simple to use and is more secure than username password. Public/private keys signing, encryption can be used as well via the RMISecurity interface. SSL may also be used at the socket layer, though that would require work in the Hadoop RPC NIO socket code. LuceneMultiClient class that allows searching over multiple remote indexes via a MultiSearcher. Class also manages obtaining the latest Searchables via the registered IndexListener. Distributed events for new Searchables on a remote LuceneServer reopen. LuceneClient always has the most up to date Searchable automatically. Added IndexService.registerIndexListener method. Apache License headers IndexService.flushAndReopen method flushes indexes changes from the IndexWriter, reopens, and returns the latest Searchable. Future: Facet interface with default Term and Query implementations.
        Hide
        Jason Rutherglen added a comment -

        The dynamic classloading mechanism as it is currently implemented is flawed because it only takes into account the serialized classes and not the classes loaded by the serialized classes.

        To address this, the next release will do the class serialization differently. A byte code class library such as ASM will be used to lookup all dependencies for a "top level object" on the client. A "top level object" in this context is the parameter object classes of a remote method such as Searchable.search(Query query) where Query would be the "top level object". Each object's class will have a jar file written with all class dependencies. This jar file will be automatically generated. The server on deserialization will load the jar file the first time it sees the "top level object" class.

        This process described should be fine for Lucene given the number of classes is usually quite small. For Ocean I plan on using the process to allow search related tasks to be submitted that interact directly with the core server APIs. This will allow things like highlighters, facet code, and other search side tasks to be submitted to the server without the server already having the libraries required to execute the task. This also solves the problem of application changes to search code that otherwise would require provisioning of new configuration files, or application logic to the server. A good example of this is interaction with IndexReader methods not directly available via Searchable. One could write a task that interacts with IndexReader and returns the result to the client without manually installing a new library on the server to handle the desired logic. This type of system is probably obviously useful for Lucene on a large number of servers where advanced dynamic query and other advanced logic is desired. Use cases that come to mind are SpanQueries, Payload queries. Payloads in particular because they require code level updates to Analyzers and Queries, and may need to be changed often.

        Show
        Jason Rutherglen added a comment - The dynamic classloading mechanism as it is currently implemented is flawed because it only takes into account the serialized classes and not the classes loaded by the serialized classes. To address this, the next release will do the class serialization differently. A byte code class library such as ASM will be used to lookup all dependencies for a "top level object" on the client. A "top level object" in this context is the parameter object classes of a remote method such as Searchable.search(Query query) where Query would be the "top level object". Each object's class will have a jar file written with all class dependencies. This jar file will be automatically generated. The server on deserialization will load the jar file the first time it sees the "top level object" class. This process described should be fine for Lucene given the number of classes is usually quite small. For Ocean I plan on using the process to allow search related tasks to be submitted that interact directly with the core server APIs. This will allow things like highlighters, facet code, and other search side tasks to be submitted to the server without the server already having the libraries required to execute the task. This also solves the problem of application changes to search code that otherwise would require provisioning of new configuration files, or application logic to the server. A good example of this is interaction with IndexReader methods not directly available via Searchable. One could write a task that interacts with IndexReader and returns the result to the client without manually installing a new library on the server to handle the desired logic. This type of system is probably obviously useful for Lucene on a large number of servers where advanced dynamic query and other advanced logic is desired. Use cases that come to mind are SpanQueries, Payload queries. Payloads in particular because they require code level updates to Analyzers and Queries, and may need to be changed often.
        Hide
        Jason Rutherglen added a comment -

        The classloading mechanism described above was also found to not be suitable. This because it would require a scan of all of the classes each time. Because of inheritance, it is impossible to accurately obtain all of the classes without a scan on each serialization. This impacts performance too much.

        In working on this problem I found what I think is a design flaw in Java that would solve many of the issues and that is not compiling a serialVersionUID into classes automatically if they do not define one. The current design creates inconsistencies during the deserialization process with the ObjectInputStream.resolveClass(ObjectStreamClass desc) where the ObjectStreamClass parameter returns a fake serialVersionUID that is inconsistent across VM implementations. Also because this serialVersionUID is only available from the ObjectStreamClass it makes creating a map of classes and classes versions difficult.

        The solution which is easiest, most reliable and most efficient is to have a session based classloading mechanism, where the session is between a client and the server. The client generates a unique session id every time the VM or in J2EE the webapp is loaded. This mostly guarantees the classes on the client will be consistent (it is the client's responsibility restart the RMI object which generates a new session id if the client is dynamically loading classes). The server maintains a SessionClassLoader per client session that is used by the deserialization code to dynamically load classes from the client. The only limitation in this solution is with the number SessionClassLoaders a server can support. In most systems it will not be factor. The SessionClassLoaders on the server will simply expire from the map after a period of not being used, rather than use remote referencing which would increase network traffic unnecessarily.

        Show
        Jason Rutherglen added a comment - The classloading mechanism described above was also found to not be suitable. This because it would require a scan of all of the classes each time. Because of inheritance, it is impossible to accurately obtain all of the classes without a scan on each serialization. This impacts performance too much. In working on this problem I found what I think is a design flaw in Java that would solve many of the issues and that is not compiling a serialVersionUID into classes automatically if they do not define one. The current design creates inconsistencies during the deserialization process with the ObjectInputStream.resolveClass(ObjectStreamClass desc) where the ObjectStreamClass parameter returns a fake serialVersionUID that is inconsistent across VM implementations. Also because this serialVersionUID is only available from the ObjectStreamClass it makes creating a map of classes and classes versions difficult. The solution which is easiest, most reliable and most efficient is to have a session based classloading mechanism, where the session is between a client and the server. The client generates a unique session id every time the VM or in J2EE the webapp is loaded. This mostly guarantees the classes on the client will be consistent (it is the client's responsibility restart the RMI object which generates a new session id if the client is dynamically loading classes). The server maintains a SessionClassLoader per client session that is used by the deserialization code to dynamically load classes from the client. The only limitation in this solution is with the number SessionClassLoaders a server can support. In most systems it will not be factor. The SessionClassLoaders on the server will simply expire from the map after a period of not being used, rather than use remote referencing which would increase network traffic unnecessarily.
        Hide
        Jason Rutherglen added a comment -

        Won't be working on these and they're old

        Show
        Jason Rutherglen added a comment - Won't be working on these and they're old

          People

          • Assignee:
            Unassigned
            Reporter:
            Jason Rutherglen
          • Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development