Solr
  1. Solr
  2. SOLR-1143

Return partial results when a connection to a shard is refused

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: search
    • Labels:
      None

      Description

      If any shard is down in a distributed search, a ConnectException it thrown.

      Here's a little patch that change this behaviour: if we can't connect to a shard (ConnectException), we get partial results from the active shards. As for TimeOut parameter (https://issues.apache.org/jira/browse/SOLR-502), we set the parameter "partialResults" at true.

      This patch also adresses a problem expressed in the mailing list about a year ago (http://www.nabble.com/partialResults,-distributed-search---SOLR-502-td19002610.html)

      We have a use case that needs this behaviour and we would like to know your thougths about such a behaviour? Should it be the default behaviour for distributed search?

      1. SOLR-1143.patch
        4 kB
        Nicolas Dessaigne
      2. SOLR-1143-2.patch
        16 kB
        Martijn van Groningen
      3. SOLR-1143-3.patch
        15 kB
        Martijn van Groningen

        Issue Links

          Activity

          Hide
          Ryan McKinley added a comment -

          This can be solved with SOLR-3134

          Show
          Ryan McKinley added a comment - This can be solved with SOLR-3134
          Hide
          Ryan McKinley added a comment -

          I have not looked at this patch... but check if SOLR-3134 does what you need (that is included in trunk)

          If you add: &shards.tolerant=true to the request, it will not abort on errors (timeout or other)

          Show
          Ryan McKinley added a comment - I have not looked at this patch... but check if SOLR-3134 does what you need (that is included in trunk) If you add: &shards.tolerant=true to the request, it will not abort on errors (timeout or other)
          Hide
          Russell Black added a comment -

          It's been over two years since this patch was submitted. What's the status? I would like to see this in the next release.

          Show
          Russell Black added a comment - It's been over two years since this patch was submitted. What's the status? I would like to see this in the next release.
          Hide
          Robert Muir added a comment -

          3.4 -> 3.5

          Show
          Robert Muir added a comment - 3.4 -> 3.5
          Hide
          Robert Muir added a comment -

          Bulk move 3.2 -> 3.3

          Show
          Robert Muir added a comment - Bulk move 3.2 -> 3.3
          Hide
          Shawn Heisey added a comment -

          I tried this patch, then we put SOLR-1537 into production on 1.5-dev. When we finally abandoned our old search software and reclaimed its hardware, we decided to use 1.4.1 and a distributed+replicated architecture with a load balancer. IMHO, production setups require true fault tolerance where a single failure produces no service degradation beyond slightly slower queries. These patches are a band-aid.

          Show
          Shawn Heisey added a comment - I tried this patch, then we put SOLR-1537 into production on 1.5-dev. When we finally abandoned our old search software and reclaimed its hardware, we decided to use 1.4.1 and a distributed+replicated architecture with a load balancer. IMHO, production setups require true fault tolerance where a single failure produces no service degradation beyond slightly slower queries. These patches are a band-aid.
          Hide
          Gary Yngve added a comment -

          This issue has been superceded by https://issues.apache.org/jira/browse/SOLR-1537

          That patch looks like it is still in limbo too, but you could apply it yourself (I'm playing with it now myself).

          Show
          Gary Yngve added a comment - This issue has been superceded by https://issues.apache.org/jira/browse/SOLR-1537 That patch looks like it is still in limbo too, but you could apply it yourself (I'm playing with it now myself).
          Hide
          Rich Cariens added a comment -

          Looks like this issue's been starving for attention...

          What's the status on this patch? Are we waiting for SolrCloud?

          Show
          Rich Cariens added a comment - Looks like this issue's been starving for attention... What's the status on this patch? Are we waiting for SolrCloud?
          Hide
          Hoss Man added a comment -

          Bulk updating 240 Solr issues to set the Fix Version to "next" per the process outlined in this email...

          http://mail-archives.apache.org/mod_mbox/lucene-dev/201005.mbox/%3Calpine.DEB.1.10.1005251052040.24672@radix.cryptio.net%3E

          Selection criteria was "Unresolved" with a Fix Version of 1.5, 1.6, 3.1, or 4.0. email notifications were suppressed.

          A unique token for finding these 240 issues in the future: hossversioncleanup20100527

          Show
          Hoss Man added a comment - Bulk updating 240 Solr issues to set the Fix Version to "next" per the process outlined in this email... http://mail-archives.apache.org/mod_mbox/lucene-dev/201005.mbox/%3Calpine.DEB.1.10.1005251052040.24672@radix.cryptio.net%3E Selection criteria was "Unresolved" with a Fix Version of 1.5, 1.6, 3.1, or 4.0. email notifications were suppressed. A unique token for finding these 240 issues in the future: hossversioncleanup20100527
          Hide
          Peter Sturge added a comment -

          This is a cool patch - yes, very useful.

          I've found a couple of issues with it, though:

          1. When going through the 'waiting for shard replies' loop, because no exception is thrown on shard failure, the next block after the loop can throw a NullPointerException in SearchComponent.handleResponses() for any SearchComponent that checks shard responses. It could be that this doesn't always happen, but it certainly happens in FacetComponent when date_facets are turned on.

          2. There's a bit of code that sets partialResults=true if there's at least one failure, but it doesn't set it to false if everything's ok. In order for the patch to operate, this parameter must have already been present and true, otherwise the patch is essentially 'disabled' anyway (problem of using the same parameter as input and result).

          I've made some modifications to the patch for these and a couple of other things:

          1. FacetComponent modified to check for null shard reponse. Perhaps it would be better to check this in SearchHandler.handleResponses(), but then no SearchComponents would be contacted re failed shards, even if they don't care that it's failed (is that a good thing?).

          2. Added a new CommonParams parameter called FAILED_SHARDS.
          partialResults is now only an input parameter to enable the feature (Note: partialResults is referenced in RequestHandlerBase, but it's not from the patch - is this an existing parameter that is used for something else?! If so, perhaps the name should be changed to something like allowPartialResults to avoid b/w compat and other potential conflicts).
          The output parameter that goes in the response header is now: failedShards=shard0;shard1;shardn. If everything succeeds, there will be no failedShards in the response header, otherwise, a list of failed shards is given. This is very useful to alert someone/something that a server/network needs attention (e.g. a health checker thread could run empty disributed seaches solely for the purpose of checking status).

          3. Changed the detection of a shard request error to be any Exception, rather than just ConnectException. This way, any failure is caught and can be actioned. Possible TODO: it might be nice to include a short message (Exception class name?) in the FAILED_SHARDS parameter about what failed (e.g. ConnectException, IOException, etc.). If you like this idea, please say so, and I'll include it - i.e. something like:
          {{ failedShards=myshard:8983/solr/core0|ConnectException;myothershard:8983/solr/core0|IOException}}

          I'm currently testing these changes in our internal build. In the meantime, any comments are grealy appreciated. If there are no objections, I'll add a patch update when the dev test run is complete.

          Show
          Peter Sturge added a comment - This is a cool patch - yes, very useful. I've found a couple of issues with it, though: 1. When going through the 'waiting for shard replies' loop, because no exception is thrown on shard failure, the next block after the loop can throw a NullPointerException in SearchComponent.handleResponses() for any SearchComponent that checks shard responses. It could be that this doesn't always happen, but it certainly happens in FacetComponent when date_facets are turned on. 2. There's a bit of code that sets partialResults=true if there's at least one failure, but it doesn't set it to false if everything's ok. In order for the patch to operate, this parameter must have already been present and true, otherwise the patch is essentially 'disabled' anyway (problem of using the same parameter as input and result). I've made some modifications to the patch for these and a couple of other things: 1. FacetComponent modified to check for null shard reponse. Perhaps it would be better to check this in SearchHandler.handleResponses(), but then no SearchComponents would be contacted re failed shards, even if they don't care that it's failed (is that a good thing?). 2. Added a new CommonParams parameter called FAILED_SHARDS. partialResults is now only an input parameter to enable the feature (Note: partialResults is referenced in RequestHandlerBase, but it's not from the patch - is this an existing parameter that is used for something else?! If so, perhaps the name should be changed to something like allowPartialResults to avoid b/w compat and other potential conflicts). The output parameter that goes in the response header is now: failedShards=shard0;shard1;shardn . If everything succeeds, there will be no failedShards in the response header, otherwise, a list of failed shards is given. This is very useful to alert someone/something that a server/network needs attention (e.g. a health checker thread could run empty disributed seaches solely for the purpose of checking status). 3. Changed the detection of a shard request error to be any Exception, rather than just ConnectException. This way, any failure is caught and can be actioned. Possible TODO: it might be nice to include a short message (Exception class name?) in the FAILED_SHARDS parameter about what failed (e.g. ConnectException, IOException, etc.). If you like this idea, please say so, and I'll include it - i.e. something like: {{ failedShards=myshard:8983/solr/core0|ConnectException;myothershard:8983/solr/core0|IOException}} I'm currently testing these changes in our internal build. In the meantime, any comments are grealy appreciated. If there are no objections, I'll add a patch update when the dev test run is complete.
          Hide
          David Bowen added a comment -

          I've found this patch very useful. I recommend extending it to check for instanceof IOException rather than just java.net.ConnectException. This is useful in order to catch org.apache.commons.httpclient.ConnectTimeoutException and java.net.SocketTimeoutException.

          Show
          David Bowen added a comment - I've found this patch very useful. I recommend extending it to check for instanceof IOException rather than just java.net.ConnectException. This is useful in order to catch org.apache.commons.httpclient.ConnectTimeoutException and java.net.SocketTimeoutException.
          Hide
          Mike Anderson added a comment -

          What's the current state of this use case? I have a shard that is a slower than all the others and I'd rather just get partial (or no) results back from the slow shard instead of slowing down the whole operation.

          I've looked over SOLR-1143, SOLR-502, and SOLR-850 but I'm not exactly sure how they all tie together and what's available from trunk today.

          I tried setting timeAllowed to something really small like 5, but I still got back all of the results I got when timeAllowed wasn't set (I would have expected no results).

          -mike

          Show
          Mike Anderson added a comment - What's the current state of this use case? I have a shard that is a slower than all the others and I'd rather just get partial (or no) results back from the slow shard instead of slowing down the whole operation. I've looked over SOLR-1143 , SOLR-502 , and SOLR-850 but I'm not exactly sure how they all tie together and what's available from trunk today. I tried setting timeAllowed to something really small like 5, but I still got back all of the results I got when timeAllowed wasn't set (I would have expected no results). -mike
          Hide
          Martijn van Groningen added a comment -

          Sure, that is a good idea. I think that also other types of exceptions should result in a partial result (currently just a connection timeout will result in a partial result). I think that this behaviour should be enabled with a parameter in the request. Something like shards.requestTimeout=1500 (time is in ms).

          Show
          Martijn van Groningen added a comment - Sure, that is a good idea. I think that also other types of exceptions should result in a partial result (currently just a connection timeout will result in a partial result). I think that this behaviour should be enabled with a parameter in the request. Something like shards.requestTimeout=1500 (time is in ms).
          Hide
          Jason Rutherglen added a comment -

          The particular case not solved today that I'm running into is a
          Solr server that simply takes too long and slows down the entire
          distributed query. Maybe we need a patch to timeout an
          individual distributed shard request and return partial results
          and/or indicate which server is taking too long?

          Show
          Jason Rutherglen added a comment - The particular case not solved today that I'm running into is a Solr server that simply takes too long and slows down the entire distributed query. Maybe we need a patch to timeout an individual distributed shard request and return partial results and/or indicate which server is taking too long?
          Hide
          Grant Ingersoll added a comment -

          Given that there is likely going to be a whole lot more work on distributed search in 1.5 (see the ZooKeeper, Hadoop, etc.) I think it makes sense to defer this to 1.5.

          Show
          Grant Ingersoll added a comment - Given that there is likely going to be a whole lot more work on distributed search in 1.5 (see the ZooKeeper, Hadoop, etc.) I think it makes sense to defer this to 1.5.
          Hide
          Martijn van Groningen added a comment -

          I think we need to bird's-eye view at the partial results solution, so that we can hook in the partial results behaviour at the right place. This is quiet a long comment, but first I will describe how I think that distributed search works and then propose a solution. In think that this solutions is better than the current one in the patch.

          From my understanding the distributed search in the trunk currently works as follows:
          1) When it has been determined that the a search request is a multi shard request an instance of HttpCommComponent is created and outgoing and finished lists are initialised. Also the nextStage is set to zero.
          2) The ResponseBuilder's stage is set to the nextStage and the nextStage is set to stage done. The distributedProcess(...) method is invoked on each search component. Each search component can add ShardRequests to the outgoing list in the ResponseBuilder. Besides adding ShardRequests, a search component also returns a stage. The lowest stage from all search components will end up to be the next stage.

          // call all components
          for( SearchComponent c : components ) {
          	// the next stage is the minimum of what all components report
          	nextStage = Math.min(nextStage, c.distributedProcess(rb));
          }
          

          3a) Next step is to send all the ShardRequests from ResponseBuilder's output list to the shards. First a ShardRequest is taken and removed from the ResponseBuilder's output list, then the actual shards are determined for the current ShardRequest.

          ShardRequest sreq = rb.outgoing.remove(0);

          It checks if for the current overall search request the shards are specified and than use. If that is not the case the predefined shards become the actual shards.

          sreq.actualShards = sreq.shards;
          if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
          	sreq.actualShards = rb.shards;
          }
          

          3b)Now that the actual shards are known, a request can be sent to each individual shard. The actual sending of the request is done by the HttpCommComponent.submit(...) method. Before the request is sent, a new SolrParams is constructed based on the overall search request parameters. But with some parameters removed and some parameters added. Then the SolrParams is given to HttpCommComponent.submit(...) method as a argument and is used to create a QueryRequest. In the HttpCommComponent.submit(...) a Callable is instantiated to handle sending request to a shard and receiving a response in an asynchronised manner.

          In the takes's call() method the actual request (QueryRequest) is created, that will be send to a shard. Also in this method the response is received and if an exception occurred, it is set on the shard response. The callable is then submitted to the completionService's submit method. The submit methods returns a Future that is then added to a set of futures named pending.From my understanding this pending list of futures is only used to keep track of how many request were send and to cancel a request when an exception occurred.

          4) When the request are sent for a stage, the next step is to receive the response for each shard request that has been sent. The comm.takeCompletedOrError() returns a shard response. It first checks if an exception was set on the response, if so the search is aborted and the exception is re-thrown. If all went well, then the request of the shard response is added to a list of successful request named finished. After that, the SearchComponent's handleResponses(...) method is invoked that allows the search components to inspect the shard response and perhaps do something with it. The behaviour is repeated until comm.takeCompletedOrError() returns null, which means that all response for the current stage were retrieved.

          The comm.takeCompletedOrError() handles each response from the shards individually (sub ShardRequest). It uses the completionService's take() method that get a future and uses that to remove that same instance for the pending set. Then the method get is invoked on the future and the response is returned. If the response contains an exception then the response is immediately returned. When the response does not contain a exception it is added to the responses of the ShardRequest. When the number of responses in the ShardRequest is equal to the number of shards then the last response from the get() method of Future is returned (it contains the ShardRequest that contains all the responses).

          5) When all request were sent and response were received, on each search component the finishStage(...) method is executed. This allows components to execute some custom logic that is only possible if all shard requests are collected. When that is done it checks if the current stage is not equal to stage done. It then continues with step 2 till 5, until the stage finish is the current stage. That indicates that the distributed search is finished and the response can be written to the client.

          I think the best way to handle shard failures in my opinion is by not sending a request to a shard that has failed. I think the best way to implement that is by doing the following:
          1) Currently ShardRequest has a property actualShards that is a string array of shard host names. Let say we create a Shard data type that contains a string hostname and a boolean failed as properties. The actualShards property will be changed to this Shard data type.
          2) In phase 4 when we discover that a ShardRequest failed we need to mark a shard as failed. Therefore the take() or takeCompletedOrError() need store the shard hostname with the exception. In the handleRequestBody we then check if one or more exceptions / hostnames were set, if so we mark those hostnames in ShardRequest as failed.
          3) In phase 3b we only invoke HttpCommComponent.submit(...) on the shards that are not marked as failed.
          Something like this:

          for (Shard shard : sreq.actualShards) {
          	if (shard.hasFailed()) {
          		continue;
          	}
          	ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
          	params.remove(ShardParams.SHARDS);      // not a top-level request
          	params.remove("indent");
          	params.remove(CommonParams.HEADER_ECHO_PARAMS);
          	params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
          	String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
          	if (shardHandler == null) {
          		params.remove(CommonParams.QT);
          	else {
          		params.set(CommonParams.QT, shardHandler);
          	}
          	comm.submit(sreq, shard.getHostname(), params);
          }
          

          I think that this approach is much more efficient than the current approach, because no request is sent to the failed shard and thus HttpClient does not try to make a connection to a shard that would not response properly anyway. I think implementing this solution is not that much work. What are your thoughts about this approach?

          Show
          Martijn van Groningen added a comment - I think we need to bird's-eye view at the partial results solution, so that we can hook in the partial results behaviour at the right place. This is quiet a long comment, but first I will describe how I think that distributed search works and then propose a solution. In think that this solutions is better than the current one in the patch. From my understanding the distributed search in the trunk currently works as follows: 1) When it has been determined that the a search request is a multi shard request an instance of HttpCommComponent is created and outgoing and finished lists are initialised. Also the nextStage is set to zero. 2) The ResponseBuilder's stage is set to the nextStage and the nextStage is set to stage done. The distributedProcess(...) method is invoked on each search component. Each search component can add ShardRequests to the outgoing list in the ResponseBuilder. Besides adding ShardRequests, a search component also returns a stage. The lowest stage from all search components will end up to be the next stage. // call all components for ( SearchComponent c : components ) { // the next stage is the minimum of what all components report nextStage = Math .min(nextStage, c.distributedProcess(rb)); } 3a) Next step is to send all the ShardRequests from ResponseBuilder's output list to the shards. First a ShardRequest is taken and removed from the ResponseBuilder's output list, then the actual shards are determined for the current ShardRequest. ShardRequest sreq = rb.outgoing.remove(0); It checks if for the current overall search request the shards are specified and than use. If that is not the case the predefined shards become the actual shards. sreq.actualShards = sreq.shards; if (sreq.actualShards==ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards; } 3b)Now that the actual shards are known, a request can be sent to each individual shard. The actual sending of the request is done by the HttpCommComponent.submit(...) method. Before the request is sent, a new SolrParams is constructed based on the overall search request parameters. But with some parameters removed and some parameters added. Then the SolrParams is given to HttpCommComponent.submit(...) method as a argument and is used to create a QueryRequest. In the HttpCommComponent.submit(...) a Callable is instantiated to handle sending request to a shard and receiving a response in an asynchronised manner. In the takes's call() method the actual request (QueryRequest) is created, that will be send to a shard. Also in this method the response is received and if an exception occurred, it is set on the shard response. The callable is then submitted to the completionService's submit method. The submit methods returns a Future that is then added to a set of futures named pending.From my understanding this pending list of futures is only used to keep track of how many request were send and to cancel a request when an exception occurred. 4) When the request are sent for a stage, the next step is to receive the response for each shard request that has been sent. The comm.takeCompletedOrError() returns a shard response. It first checks if an exception was set on the response, if so the search is aborted and the exception is re-thrown. If all went well, then the request of the shard response is added to a list of successful request named finished. After that, the SearchComponent's handleResponses(...) method is invoked that allows the search components to inspect the shard response and perhaps do something with it. The behaviour is repeated until comm.takeCompletedOrError() returns null, which means that all response for the current stage were retrieved. The comm.takeCompletedOrError() handles each response from the shards individually (sub ShardRequest). It uses the completionService's take() method that get a future and uses that to remove that same instance for the pending set. Then the method get is invoked on the future and the response is returned. If the response contains an exception then the response is immediately returned. When the response does not contain a exception it is added to the responses of the ShardRequest. When the number of responses in the ShardRequest is equal to the number of shards then the last response from the get() method of Future is returned (it contains the ShardRequest that contains all the responses). 5) When all request were sent and response were received, on each search component the finishStage(...) method is executed. This allows components to execute some custom logic that is only possible if all shard requests are collected. When that is done it checks if the current stage is not equal to stage done. It then continues with step 2 till 5, until the stage finish is the current stage. That indicates that the distributed search is finished and the response can be written to the client. I think the best way to handle shard failures in my opinion is by not sending a request to a shard that has failed. I think the best way to implement that is by doing the following: 1) Currently ShardRequest has a property actualShards that is a string array of shard host names. Let say we create a Shard data type that contains a string hostname and a boolean failed as properties. The actualShards property will be changed to this Shard data type. 2) In phase 4 when we discover that a ShardRequest failed we need to mark a shard as failed. Therefore the take() or takeCompletedOrError() need store the shard hostname with the exception. In the handleRequestBody we then check if one or more exceptions / hostnames were set, if so we mark those hostnames in ShardRequest as failed. 3) In phase 3b we only invoke HttpCommComponent.submit(...) on the shards that are not marked as failed. Something like this: for (Shard shard : sreq.actualShards) { if (shard.hasFailed()) { continue ; } ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); params.remove(ShardParams.SHARDS); // not a top-level request params.remove( "indent" ); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true ); // a sub (shard) request String shardHandler = req.getParams().get(ShardParams.SHARDS_QT); if (shardHandler == null ) { params.remove(CommonParams.QT); else { params.set(CommonParams.QT, shardHandler); } comm.submit(sreq, shard.getHostname(), params); } I think that this approach is much more efficient than the current approach, because no request is sent to the failed shard and thus HttpClient does not try to make a connection to a shard that would not response properly anyway. I think implementing this solution is not that much work. What are your thoughts about this approach?
          Hide
          Grant Ingersoll added a comment -

          But, take or takeOrError() isn't the thing that cancels the other two responses, comm.cancelAll is, AIUI, and that is not called in the take methods. Also, Take deals with the Future callbacks, each of which are executed in separate threads via the Executor.

          Show
          Grant Ingersoll added a comment - But, take or takeOrError() isn't the thing that cancels the other two responses, comm.cancelAll is, AIUI, and that is not called in the take methods. Also, Take deals with the Future callbacks, each of which are executed in separate threads via the Executor.
          Hide
          Martijn van Groningen added a comment -

          Sorry for my confusing comment. I meant to say takeOrError() does return immediately when an exception occurs. To avoid more confusion I will sketch a situation from what I currently understand from the code to show that takeOrError() should not be used when returning partial result.

          For each stage a number of requests may be send to the shards and a number of responses may be returned from the shards for further processing.
          Lets say we have three shards and we send a shard request in a certain stage to all three shards. If the first response contains an error the current behaviour is to return the response immediately, without adding the two other responses (that did return without an error). Because of this the so called partial result might contain less data or even nothing. Therefore I think take() should be used there. I think takeOrError() is only suitable when not using partial result.

          ShardResponse takeCompletedOrError() {
              while (pending.size() > 0) {
                try {
                  Future<ShardResponse> future = completionService.take();
                  pending.remove(future);
                  ShardResponse rsp = future.get();
                  if (rsp.getException() != null) return rsp; // now we return and if there are more pending results, we lose them
                  ...............
                  rsp.getShardRequest().responses.add(rsp);
                  if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
                    return rsp;
                  }
                } catch (InterruptedException e) {
                ......
              }
              return null;
            }
          

          Again this what I understand from the code. What do you think about this?

          I also did some more thinking about how to improve shard failures. Currently if a shard fails in a early stage of the distributed search we keep sending requests to the shard, although we noticed in a previous stage that it was not responding. You think that it is a good idea to mark a shard as failed, so that it will not use the shard that is marked as failed for the current running search?

          Show
          Martijn van Groningen added a comment - Sorry for my confusing comment. I meant to say takeOrError() does return immediately when an exception occurs. To avoid more confusion I will sketch a situation from what I currently understand from the code to show that takeOrError() should not be used when returning partial result. For each stage a number of requests may be send to the shards and a number of responses may be returned from the shards for further processing. Lets say we have three shards and we send a shard request in a certain stage to all three shards. If the first response contains an error the current behaviour is to return the response immediately, without adding the two other responses (that did return without an error). Because of this the so called partial result might contain less data or even nothing. Therefore I think take() should be used there. I think takeOrError() is only suitable when not using partial result. ShardResponse takeCompletedOrError() { while (pending.size() > 0) { try { Future<ShardResponse> future = completionService.take(); pending.remove( future ); ShardResponse rsp = future .get(); if (rsp.getException() != null ) return rsp; // now we return and if there are more pending results, we lose them ............... rsp.getShardRequest().responses.add(rsp); if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { return rsp; } } catch (InterruptedException e) { ...... } return null ; } Again this what I understand from the code. What do you think about this? I also did some more thinking about how to improve shard failures. Currently if a shard fails in a early stage of the distributed search we keep sending requests to the shard, although we noticed in a previous stage that it was not responding. You think that it is a good idea to mark a shard as failed, so that it will not use the shard that is marked as failed for the current running search?
          Hide
          Grant Ingersoll added a comment -

          I don't follow. The only difference between the two methods is that takeOrError returns immediately if there was an error and doesn't put it in the response list, which is what you are checking for in your loop anyway. From what I can tell the while loop isn't going to break until all pending are accounted for, either by error or by valid results. I don't see how it is beneficial to examine every shard response every time and I don't see why that would prevent you from losing responses as it is independent of the request sent.

          Show
          Grant Ingersoll added a comment - I don't follow. The only difference between the two methods is that takeOrError returns immediately if there was an error and doesn't put it in the response list, which is what you are checking for in your loop anyway. From what I can tell the while loop isn't going to break until all pending are accounted for, either by error or by valid results. I don't see how it is beneficial to examine every shard response every time and I don't see why that would prevent you from losing responses as it is independent of the request sent.
          Hide
          Martijn van Groningen added a comment -

          Actually your approach makes more sense, because that is more efficient. But the takeCompletedOrError() method may then not directly return when a shard failure occurs, because then you might lose the response from the other shards. I initially tried to change take() to takeCompletedOrError(), but then I noticed this problem.

          Show
          Martijn van Groningen added a comment - Actually your approach makes more sense, because that is more efficient. But the takeCompletedOrError() method may then not directly return when a shard failure occurs, because then you might lose the response from the other shards. I initially tried to change take() to takeCompletedOrError(), but then I noticed this problem.
          Hide
          Grant Ingersoll added a comment -

          I'm not sure I'm following the changes in SearchHandler.

          AIUI, before, we check for ShardResponses via comm.takeCompletedOrError() and then process the error and check for an exception. If there is an exception, we throw it, essentially.

          In the new code, it is replaced by just take() which returns the response, null or an exception. We then iterate over the whole set of responses every time we enter the while (rb.outgoing...) loop.

          However, why wouldn't you just keep the existing takeCompletedOrError, check to see if that shard is an error and handle it. At the end of the loop it should be easy to determine if the number of requests sent equals the number received and then add the partial results indicator, and, potentially, indicate which shards failed.

          What am I not understanding? Basically, I don't get the need for:

          for (ShardResponse shardRsp : srsp.getShardRequest().responses)
          ...
          
          Show
          Grant Ingersoll added a comment - I'm not sure I'm following the changes in SearchHandler. AIUI, before, we check for ShardResponses via comm.takeCompletedOrError() and then process the error and check for an exception. If there is an exception, we throw it, essentially. In the new code, it is replaced by just take() which returns the response, null or an exception. We then iterate over the whole set of responses every time we enter the while (rb.outgoing...) loop. However, why wouldn't you just keep the existing takeCompletedOrError, check to see if that shard is an error and handle it. At the end of the loop it should be easy to determine if the number of requests sent equals the number received and then add the partial results indicator, and, potentially, indicate which shards failed. What am I not understanding? Basically, I don't get the need for: for (ShardResponse shardRsp : srsp.getShardRequest().responses) ...
          Hide
          Martijn van Groningen added a comment - - edited

          Hi Grant, thanks for mentioning that, I did not realized that.

          I have two other ideas about returning a partial result that might be usable in this patch:
          1) Currently when a partial result is returned the response does not tell you which shard has failed, it only tells you that it is a partial result. Wouldn't it be handy to include hostnames or ip addresses in the response of the shards that had a connection timeout?

          2) A partial result is only returned when when a connection exception occurs, is it practical to return a partial result when another type of exception occurs? Let say one shard has a corrupted index and therefore while searching only that shard throws an exception, I can imagine that in such situation it is also useful to return a partial result instead of only returning an error for the complete search.

          Show
          Martijn van Groningen added a comment - - edited Hi Grant, thanks for mentioning that, I did not realized that. I have two other ideas about returning a partial result that might be usable in this patch: 1) Currently when a partial result is returned the response does not tell you which shard has failed, it only tells you that it is a partial result. Wouldn't it be handy to include hostnames or ip addresses in the response of the shards that had a connection timeout? 2) A partial result is only returned when when a connection exception occurs, is it practical to return a partial result when another type of exception occurs? Let say one shard has a corrupted index and therefore while searching only that shard throws an exception, I can imagine that in such situation it is also useful to return a partial result instead of only returning an error for the complete search.
          Hide
          Grant Ingersoll added a comment -

          Small, FYI on patch submission: No need to name them XXXX-1, XXXX-2, etc. JIRA will actually version them automatically and gray out all but the most current one. Doing so makes it easier to see what is the current patch w/o reading every one.

          Show
          Grant Ingersoll added a comment - Small, FYI on patch submission: No need to name them XXXX-1, XXXX-2, etc. JIRA will actually version them automatically and gray out all but the most current one. Doing so makes it easier to see what is the current patch w/o reading every one.
          Hide
          Martijn van Groningen added a comment -

          You are right Grant. I guess I forgot about the request handler defaults when I was creating this patch...
          I have removed the possibility to configure partial results via the return-partial-results property in the latest patch. The request handler defaults can perfectly configure partial results to be enabled by default.

          Show
          Martijn van Groningen added a comment - You are right Grant. I guess I forgot about the request handler defaults when I was creating this patch... I have removed the possibility to configure partial results via the return-partial-results property in the latest patch. The request handler defaults can perfectly configure partial results to be enabled by default.
          Hide
          Grant Ingersoll added a comment - - edited

          In this patch you return a partial result when a shard has failed by setting partialResults to true in the request or if you want it to for all requests your can add <bool name="return-partial-results">true</bool> to your search handler in your solrconfig.xml. If both are not specified, partial results are disabled. Currently the partialResults parameter overrides the return-partial-results property in the search handler.

          I'm not sure about the need for the return-partial-results static boolean. This could just be handled through the RequestHandler defaults, right?

          Show
          Grant Ingersoll added a comment - - edited In this patch you return a partial result when a shard has failed by setting partialResults to true in the request or if you want it to for all requests your can add <bool name="return-partial-results">true</bool> to your search handler in your solrconfig.xml. If both are not specified, partial results are disabled. Currently the partialResults parameter overrides the return-partial-results property in the search handler. I'm not sure about the need for the return-partial-results static boolean. This could just be handled through the RequestHandler defaults, right?
          Hide
          Artem Russakovskii added a comment -

          Any idea when this will be approved for pushing into trunk?

          Show
          Artem Russakovskii added a comment - Any idea when this will be approved for pushing into trunk?
          Hide
          Martijn van Groningen added a comment -

          I have added a test in TestDistributedSearch class. This test sets up a cluster of shards and then kills one shard and then it expects that the search request as a whole to continue. The TestDistributedSearch class in general tests distributed search by having a non distributed instance and a cluster of shards both have the same documents. All results from the cluster are compared with results from the non distributed instance. Some things in the test I added like facets and maxScore could not be tested because one shard in the cluster is down (so part of the corpus is missing). Only the documents that are returned from the shards are compared against the documents in the non distributed instance.

          I have also included the option to disable / enable partial results as Lance described. I agree with Lance that ignoring a shard failure should not be enabled by default, if you do not know about this feature then finding the cause of the actual problem might be difficult.

          In this patch you return a partial result when a shard has failed by setting partialResults to true in the request or if you want it to for all requests your can add <bool name="return-partial-results">true</bool> to your search handler in your solrconfig.xml. If both are not specified, partial results are disabled. Currently the partialResults parameter overrides the return-partial-results property in the search handler.

          Show
          Martijn van Groningen added a comment - I have added a test in TestDistributedSearch class. This test sets up a cluster of shards and then kills one shard and then it expects that the search request as a whole to continue. The TestDistributedSearch class in general tests distributed search by having a non distributed instance and a cluster of shards both have the same documents. All results from the cluster are compared with results from the non distributed instance. Some things in the test I added like facets and maxScore could not be tested because one shard in the cluster is down (so part of the corpus is missing). Only the documents that are returned from the shards are compared against the documents in the non distributed instance. I have also included the option to disable / enable partial results as Lance described. I agree with Lance that ignoring a shard failure should not be enabled by default, if you do not know about this feature then finding the cause of the actual problem might be difficult. In this patch you return a partial result when a shard has failed by setting partialResults to true in the request or if you want it to for all requests your can add <bool name="return-partial-results">true</bool> to your search handler in your solrconfig.xml. If both are not specified, partial results are disabled. Currently the partialResults parameter overrides the return-partial-results property in the search handler.
          Hide
          Lance Norskog added a comment -

          If the search subsystem has a problem, the ops team wants to know about it and fix it. This just hides problems.

          An example: 2 servers with the same shard are behind a load balancer. One server fails. The load balancer notices this and directs all traffic to the other server.

          This is a production network which serves and outside API, where everything is supposed to work >from the viewpoint of the outside API<. When the load balancer gets a failure it usually returns an error on that one request, then marks the server down. So that one search request eventually returns with a "temporary error" condition.

          These search requests come from an app server which serves the API. The app server then has the option of retrying one or two times, or returning "service not happy" to the outside calling app.

          When I have a problem in my system, I want to find it and fix it. Ignoring shard errors is ok as an option, and should be there. But, please do not make it the default. Hiding failures should never be the default.

          Show
          Lance Norskog added a comment - If the search subsystem has a problem, the ops team wants to know about it and fix it. This just hides problems. An example: 2 servers with the same shard are behind a load balancer. One server fails. The load balancer notices this and directs all traffic to the other server. This is a production network which serves and outside API, where everything is supposed to work >from the viewpoint of the outside API<. When the load balancer gets a failure it usually returns an error on that one request, then marks the server down. So that one search request eventually returns with a "temporary error" condition. These search requests come from an app server which serves the API. The app server then has the option of retrying one or two times, or returning "service not happy" to the outside calling app. When I have a problem in my system, I want to find it and fix it. Ignoring shard errors is ok as an option, and should be there. But, please do not make it the default. Hiding failures should never be the default.
          Hide
          Jason Rutherglen added a comment -

          What happens today when a query times out?

          Show
          Jason Rutherglen added a comment - What happens today when a query times out?
          Hide
          Artem Russakovskii added a comment -

          +1 for importance of this feature. If I have 10 shards, I should be able to handle 1 of them going down without returning 0 results to the user.

          Show
          Artem Russakovskii added a comment - +1 for importance of this feature. If I have 10 shards, I should be able to handle 1 of them going down without returning 0 results to the user.
          Hide
          Grant Ingersoll added a comment -

          This needs tests.

          Show
          Grant Ingersoll added a comment - This needs tests.
          Hide
          Yonik Seeley added a comment -

          Seems like this is something we should consider for 1.4

          Show
          Yonik Seeley added a comment - Seems like this is something we should consider for 1.4

            People

            • Assignee:
              Unassigned
              Reporter:
              Nicolas Dessaigne
            • Votes:
              17 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development