Avro
  1. Avro
  2. AVRO-539

Allow asynchronous clients to specify a callback to be run when server processing completes

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.5.2
    • Component/s: java
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    1. AVRO-539.patch
      58 kB
      James Baldassari
    2. AVRO-539-v2.patch
      88 kB
      James Baldassari
    3. AVRO-539-v2-2.patch
      89 kB
      James Baldassari
    4. AVRO-539-v3.patch
      71 kB
      James Baldassari
    5. AVRO-539-v4.patch
      63 kB
      James Baldassari

      Issue Links

        Activity

        Hide
        James Baldassari added a comment -

        Patch to enable asynchronous RPCs in Java

        Show
        James Baldassari added a comment - Patch to enable asynchronous RPCs in Java
        Hide
        James Baldassari added a comment -

        Here's a high-level description of my changes copied from my e-mail to the users list:

        I just finished a second attempt at the asynchronous RPC implementation incorporating Philip's feedback and some other ideas that I had. I think it's easiest to explain how it works with an example. So here's a simple IDL and schema:

        IDL:
        protocol Calculator

        { int add(int arg1, int arg2); }

        Schema:
        {"protocol":"Calculator","messages":{
        "add":{
        "request":[

        {"name":"arg1","type":"int"}

        ,

        {"name":"arg2","type":"int"}

        ],
        "response":"int"}}}

        No changes are required to the IDL or schema to enable async RPCs. The Avro Java compiler will generate two interfaces instead of one. The first interface, Calculator, contains the standard synchronous methods. The second interface, CalculatorClient, extends Calculator and adds asynchronous methods for all two-way messages. The reason why the async methods are separated out into a separate interface is that the responder/server side doesn't need to know (and shouldn't know) about the client-side async methods. So the Responder/server implements Calculator, and the Requestor/client can either use Calculator or CalculatorClient to invoke the RPCs. For reference, here is what the two generated interfaces look like (without the PROTOCOL field and package names):

        public interface Calculator

        { int add(int arg1, int arg2) throws AvroRemoteException; }

        public interface CalculatorClient extends Calculator

        { CallFuture<Integer> addAsync(int arg1, int arg2) throws IOException; CallFuture<Integer> addAsync(int arg1, int arg2, Callback<Integer> callback) throws IOException; }

        The CalculatorClient interface is the only new component. It has two methods for each message, one that takes a Callback and one that does not. Both methods return a CallFuture so that the client has the option of using either the Future or the Callback to obtain the result of the RPC. Future.get() blocks until the RPC is complete, and either returns the result or throws an exception if one occurred during the RPC. The Callback interface has two methods, handleResult(T result) and handleError(Exception error). One or the other is always called depending on whether the RPC was successful or an Exception was thrown.

        In addition to the compiler changes, I had to make some changes in the avro-ipc project to get the async plumbing to work correctly. Most of these changes are in Requestor and NettyTransceiver. As part of the changes I had to make to Requestor I ended up replacing a couple of large synchronized blocks with finer-grained critical sections protected by reentrant locks. I think this change improved performance overall, at least in the case where multiple threads are using the same client. I implemented a rudimentary performance test that spins up a bunch of threads, executes the same RPC (Simple.hello(String)) repeatedly for a fixed amount of time, and then calculates the average number of RPCs completed per second. With Avro 1.5.1 I got 7,450 RPCs/sec, and with my modified version of trunk I got 19,050 RPCs/sec. That was a very simple test, but if there is a standard benchmark that the Avro team uses I'd be happy to rerun my tests using that.

        So that's basically it. All existing unit tests pass, and I wrote additional tests for all the new async functionality. I've documented all public interfaces, and I think the changes are ready to be reviewed whenever the committers have time to take a look. Please let me know if you have any comments/questions.

        Show
        James Baldassari added a comment - Here's a high-level description of my changes copied from my e-mail to the users list: I just finished a second attempt at the asynchronous RPC implementation incorporating Philip's feedback and some other ideas that I had. I think it's easiest to explain how it works with an example. So here's a simple IDL and schema: IDL: protocol Calculator { int add(int arg1, int arg2); } Schema: {"protocol":"Calculator","messages":{ "add":{ "request":[ {"name":"arg1","type":"int"} , {"name":"arg2","type":"int"} ], "response":"int"}}} No changes are required to the IDL or schema to enable async RPCs. The Avro Java compiler will generate two interfaces instead of one. The first interface, Calculator, contains the standard synchronous methods. The second interface, CalculatorClient, extends Calculator and adds asynchronous methods for all two-way messages. The reason why the async methods are separated out into a separate interface is that the responder/server side doesn't need to know (and shouldn't know) about the client-side async methods. So the Responder/server implements Calculator, and the Requestor/client can either use Calculator or CalculatorClient to invoke the RPCs. For reference, here is what the two generated interfaces look like (without the PROTOCOL field and package names): public interface Calculator { int add(int arg1, int arg2) throws AvroRemoteException; } public interface CalculatorClient extends Calculator { CallFuture<Integer> addAsync(int arg1, int arg2) throws IOException; CallFuture<Integer> addAsync(int arg1, int arg2, Callback<Integer> callback) throws IOException; } The CalculatorClient interface is the only new component. It has two methods for each message, one that takes a Callback and one that does not. Both methods return a CallFuture so that the client has the option of using either the Future or the Callback to obtain the result of the RPC. Future.get() blocks until the RPC is complete, and either returns the result or throws an exception if one occurred during the RPC. The Callback interface has two methods, handleResult(T result) and handleError(Exception error). One or the other is always called depending on whether the RPC was successful or an Exception was thrown. In addition to the compiler changes, I had to make some changes in the avro-ipc project to get the async plumbing to work correctly. Most of these changes are in Requestor and NettyTransceiver. As part of the changes I had to make to Requestor I ended up replacing a couple of large synchronized blocks with finer-grained critical sections protected by reentrant locks. I think this change improved performance overall, at least in the case where multiple threads are using the same client. I implemented a rudimentary performance test that spins up a bunch of threads, executes the same RPC (Simple.hello(String)) repeatedly for a fixed amount of time, and then calculates the average number of RPCs completed per second. With Avro 1.5.1 I got 7,450 RPCs/sec, and with my modified version of trunk I got 19,050 RPCs/sec. That was a very simple test, but if there is a standard benchmark that the Avro team uses I'd be happy to rerun my tests using that. So that's basically it. All existing unit tests pass, and I wrote additional tests for all the new async functionality. I've documented all public interfaces, and I think the changes are ready to be reviewed whenever the committers have time to take a look. Please let me know if you have any comments/questions.
        Hide
        James Baldassari added a comment -

        Patch is in ReviewBoard: https://reviews.apache.org/r/834/

        Show
        James Baldassari added a comment - Patch is in ReviewBoard: https://reviews.apache.org/r/834/
        Hide
        Doug Cutting added a comment -

        This is great stuff! A few comments on the patch:

        • Rather than calling the new APIs 'asynchronous' might we instead consider them 'Callback-based' and/or 'Future-based'? They'd be friendly to async implementations, but a synchronous implementation would be permitted. In particular, I think we can remove 'asynchronous' from the names of these methods.
        • Do we really need both Callback-based and Future-based APIs?
        • The name of the generated interface and methods should avoid potential collisions with user-defined interfaces and messages, perhaps by using '$'. Alternately, we might just generate a single interface and not use different method names, rather distinguishing by method signature. No user method should accept a org.apache.avro.ipc.Callback parameter, so we would not need to worry about method signature collisions.
        • Should we make generation of Callback/Future-based interfaces optional?
        • in Transceiver, can we implement the Callback/Future-based API synchronously in terms of the existing API, rather than throwing an exception?
        • in Requestor, can we implement the synchronous version in terms of the Callback-based API so that less logic is replicated?
        Show
        Doug Cutting added a comment - This is great stuff! A few comments on the patch: Rather than calling the new APIs 'asynchronous' might we instead consider them 'Callback-based' and/or 'Future-based'? They'd be friendly to async implementations, but a synchronous implementation would be permitted. In particular, I think we can remove 'asynchronous' from the names of these methods. Do we really need both Callback-based and Future-based APIs? The name of the generated interface and methods should avoid potential collisions with user-defined interfaces and messages, perhaps by using '$'. Alternately, we might just generate a single interface and not use different method names, rather distinguishing by method signature. No user method should accept a org.apache.avro.ipc.Callback parameter, so we would not need to worry about method signature collisions. Should we make generation of Callback/Future-based interfaces optional? in Transceiver, can we implement the Callback/Future-based API synchronously in terms of the existing API, rather than throwing an exception? in Requestor, can we implement the synchronous version in terms of the Callback-based API so that less logic is replicated?
        Hide
        James Baldassari added a comment -

        Thanks for the comments, Doug! I'll try to address them all:

        Rather than calling the new APIs 'asynchronous' might we instead consider them 'Callback-based' and/or 'Future-based'? They'd be friendly to async implementations, but a synchronous implementation would be permitted. In particular, I think we can remove 'asynchronous' from the names of these methods.

        I see your point. The new API could theoretically be used in synchronous calls when the Transceiver doesn't support async I/O, so it isn't inherently asynchronous. I'll try to either eliminate the "Asynchronous" from the method names and simply overload them or, when necessary, change these new method names so that they end in "Callback" or "Future" as appropriate.

        Do we really need both Callback-based and Future-based APIs?

        I personally find Callbacks to be more practical than Futures. However, there may be use cases where Futures are more appropriate, so I just thought it would be better to give users the option to use either one. Since the CallFuture class was already being used in NettyTransceiver before I started making changes, I basically got the Future API for free. All I did was expose it in the public API. Users can always ignore the CallFuture that is returned by the async methods. However, I wouldn't be opposed to removing the Future API if you think it's just cluttering the interface.

        The name of the generated interface and methods should avoid potential collisions with user-defined interfaces and messages, perhaps by using '$'. Alternately, we might just generate a single interface and not use different method names, rather distinguishing by method signature. No user method should accept a org.apache.avro.ipc.Callback parameter, so we would not need to worry about method signature collisions.

        That's a good point. So one proposal is to generate methods like methodName$Callback(...) to eliminate naming conflicts. Regarding your second proposal, I think it would be great to overload the original methods with the additional Callback parameter rather than creating special method names. I'll see what I can do with that.

        Should we make generation of Callback/Future-based interfaces optional?

        I think that's a good idea. In my first attempt at this patch I made the interfaces optional by adding the 'async' keyword/property to the IDL and schema. Philip Zeyliger pointed out that this probably isn't the cleanest approach, and I agree. Can you think of a better way to make the new interfaces optional?

        in Transceiver, can we implement the Callback/Future-based API synchronously in terms of the existing API, rather than throwing an exception?

        I think you're referring to the fact that the non-Netty Transceivers throw UnsupportedOperationException. Yes, I think it should be possible to make these methods work in a synchronous way for non-Netty Transceivers. So the default implementation in Transceiver would be synchronous, and NettyTransceiver would override it to make it asynchronous. I'll work on that.

        in Requestor, can we implement the synchronous version in terms of the Callback-based API so that less logic is replicated?

        I did quite a bit of refactoring in Requestor to prevent duplicating code between the sync and async interfaces. However, once I make the change from your previous comment to make non-Netty Transceivers work with the async APIs, I think the synchronous methods can be implemented using only the asynchronous API. So that will further reduce duplication. Note that this is one case where it would be more useful to use the Future API rather than the Callback API.

        Thanks again for the feedback. I think your suggestions are going to really improve this patch. I'll work on making these changes and post an update to the patch.

        Show
        James Baldassari added a comment - Thanks for the comments, Doug! I'll try to address them all: Rather than calling the new APIs 'asynchronous' might we instead consider them 'Callback-based' and/or 'Future-based'? They'd be friendly to async implementations, but a synchronous implementation would be permitted. In particular, I think we can remove 'asynchronous' from the names of these methods. I see your point. The new API could theoretically be used in synchronous calls when the Transceiver doesn't support async I/O, so it isn't inherently asynchronous. I'll try to either eliminate the "Asynchronous" from the method names and simply overload them or, when necessary, change these new method names so that they end in "Callback" or "Future" as appropriate. Do we really need both Callback-based and Future-based APIs? I personally find Callbacks to be more practical than Futures. However, there may be use cases where Futures are more appropriate, so I just thought it would be better to give users the option to use either one. Since the CallFuture class was already being used in NettyTransceiver before I started making changes, I basically got the Future API for free. All I did was expose it in the public API. Users can always ignore the CallFuture that is returned by the async methods. However, I wouldn't be opposed to removing the Future API if you think it's just cluttering the interface. The name of the generated interface and methods should avoid potential collisions with user-defined interfaces and messages, perhaps by using '$'. Alternately, we might just generate a single interface and not use different method names, rather distinguishing by method signature. No user method should accept a org.apache.avro.ipc.Callback parameter, so we would not need to worry about method signature collisions. That's a good point. So one proposal is to generate methods like methodName$Callback(...) to eliminate naming conflicts. Regarding your second proposal, I think it would be great to overload the original methods with the additional Callback parameter rather than creating special method names. I'll see what I can do with that. Should we make generation of Callback/Future-based interfaces optional? I think that's a good idea. In my first attempt at this patch I made the interfaces optional by adding the 'async' keyword/property to the IDL and schema. Philip Zeyliger pointed out that this probably isn't the cleanest approach, and I agree. Can you think of a better way to make the new interfaces optional? in Transceiver, can we implement the Callback/Future-based API synchronously in terms of the existing API, rather than throwing an exception? I think you're referring to the fact that the non-Netty Transceivers throw UnsupportedOperationException. Yes, I think it should be possible to make these methods work in a synchronous way for non-Netty Transceivers. So the default implementation in Transceiver would be synchronous, and NettyTransceiver would override it to make it asynchronous. I'll work on that. in Requestor, can we implement the synchronous version in terms of the Callback-based API so that less logic is replicated? I did quite a bit of refactoring in Requestor to prevent duplicating code between the sync and async interfaces. However, once I make the change from your previous comment to make non-Netty Transceivers work with the async APIs, I think the synchronous methods can be implemented using only the asynchronous API. So that will further reduce duplication. Note that this is one case where it would be more useful to use the Future API rather than the Callback API. Thanks again for the feedback. I think your suggestions are going to really improve this patch. I'll work on making these changes and post an update to the patch.
        Hide
        Scott Carey added a comment -

        Can you think of a better way to make the new interfaces optional?

        Make the SpecificCompiler take it as an option, and ant/maven can pass in a flag to enable it? Can this option leverage templates?

        Note that this is one case where it would be more useful to use the Future API rather than the Callback API.

        Futures are very useful for using an asynchronous API in a synchronous fashion, or in hybrid use cases. Callbacks require more caution with concurrency issues, since the client does not control the thread that executes the callback.

        Show
        Scott Carey added a comment - Can you think of a better way to make the new interfaces optional? Make the SpecificCompiler take it as an option, and ant/maven can pass in a flag to enable it? Can this option leverage templates? Note that this is one case where it would be more useful to use the Future API rather than the Callback API. Futures are very useful for using an asynchronous API in a synchronous fashion, or in hybrid use cases. Callbacks require more caution with concurrency issues, since the client does not control the thread that executes the callback.
        Hide
        Doug Cutting added a comment -

        I wonder if we might only implement a Future-based API, if they are less error-prone and more general, as Scott indicates. James, can you elaborate on why you prefer Callbacks?

        Also, looking again at the patch, I wonder if can use Future<T> in the public APIs rather than CallFuture<T>?

        Thanks!

        Show
        Doug Cutting added a comment - I wonder if we might only implement a Future-based API, if they are less error-prone and more general, as Scott indicates. James, can you elaborate on why you prefer Callbacks? Also, looking again at the patch, I wonder if can use Future<T> in the public APIs rather than CallFuture<T>? Thanks!
        Hide
        Philip Zeyliger added a comment -
        Show
        Philip Zeyliger added a comment - BTW, Google Guava has http://guava-libraries.googlecode.com/svn/trunk/javadoc/com/google/common/util/concurrent/ListenableFuture.html , which may be a working middle ground.
        Hide
        Scott Carey added a comment -

        I wouldn't say Futures are more general. Its only a few lines of code to create a Future from a callback.

        A future is just a callback with a lock, that implements the callback by storing the result for access through the future API. We could write a class or take one from elsewhere that does this, and users could pass this in as the callback.

        Futures are more natural to use for imperative style asynchronous work:

        • Send a request, do some other work, then get or wait for the result.
        • Iterate through a list of asynchronous requests, then iterate over the future results (pipelined batch).
        • Often more useful if there is shared state managed by the calling thread.

        Callbacks are useful for event based / message based systems, or in general systems with a more functional style. Generally useful for high throughput 'server' applications or client applications with event based frameworks (often UI's).

        I think we need both, or an easy way for users to get one from the other.

        Show
        Scott Carey added a comment - I wouldn't say Futures are more general. Its only a few lines of code to create a Future from a callback. A future is just a callback with a lock, that implements the callback by storing the result for access through the future API. We could write a class or take one from elsewhere that does this, and users could pass this in as the callback. Futures are more natural to use for imperative style asynchronous work: Send a request, do some other work, then get or wait for the result. Iterate through a list of asynchronous requests, then iterate over the future results (pipelined batch). Often more useful if there is shared state managed by the calling thread. Callbacks are useful for event based / message based systems, or in general systems with a more functional style. Generally useful for high throughput 'server' applications or client applications with event based frameworks (often UI's). I think we need both, or an easy way for users to get one from the other.
        Hide
        James Baldassari added a comment -

        I agree that exposing both a future and callback API is preferable because it gives users a lot of flexibility in how they implement their clients. I don't think there is a one-size-fits-all solution for asynchronous messaging.

        Doug, to answer a couple of your questions:

        • Yes, we can absolutely use Future<T> rather than CallFuture<T> in the generated interfaces as CallFuture implements Future, and the extra methods in CallFuture probably aren't useful to most clients anyway.
        • I prefer callbacks to futures in general because in a truly asynchronous programming style the callback pattern is easier to implement. With a Future you either have to wait on it immediately (which isn't asynchronous from the client's perspective) or you have to stick it in some collection and have other threads iterating over that collection processing and cleaning up completed Futures. Back to my original point, Futures are more convenient in some cases, which is why I think it's good to have both.

        One other thing users will have to be aware of when using the callback API is that their callback handlers should never perform long-running tasks because that will block the event handler thread in the Netty thread pool, potentially preventing future messages from being processed. They should either perform very quick tasks or pass off the callback result to a queue/thread pool for processing. We should explain some of these nuances on the wiki or in some quickstart guide.

        So unless anyone has any objections, I'm going to proceed with a new version of the patch that generates client-facing interfaces like this:

        CalculatorClient.java
        public interface CalculatorClient extends Calculator {
          public static final Protocol PROTOCOL = Calculator.PROTOCOL;
          Future<Integer> add(int arg1, int arg2, Callback<Integer> callback) throws IOException;
        }
        

        If the user wants to use only the Future and not the Callback, the user can pass in a null for the Callback parameter.

        Show
        James Baldassari added a comment - I agree that exposing both a future and callback API is preferable because it gives users a lot of flexibility in how they implement their clients. I don't think there is a one-size-fits-all solution for asynchronous messaging. Doug, to answer a couple of your questions: Yes, we can absolutely use Future<T> rather than CallFuture<T> in the generated interfaces as CallFuture implements Future, and the extra methods in CallFuture probably aren't useful to most clients anyway. I prefer callbacks to futures in general because in a truly asynchronous programming style the callback pattern is easier to implement. With a Future you either have to wait on it immediately (which isn't asynchronous from the client's perspective) or you have to stick it in some collection and have other threads iterating over that collection processing and cleaning up completed Futures. Back to my original point, Futures are more convenient in some cases, which is why I think it's good to have both. One other thing users will have to be aware of when using the callback API is that their callback handlers should never perform long-running tasks because that will block the event handler thread in the Netty thread pool, potentially preventing future messages from being processed. They should either perform very quick tasks or pass off the callback result to a queue/thread pool for processing. We should explain some of these nuances on the wiki or in some quickstart guide. So unless anyone has any objections, I'm going to proceed with a new version of the patch that generates client-facing interfaces like this: CalculatorClient.java public interface CalculatorClient extends Calculator { public static final Protocol PROTOCOL = Calculator.PROTOCOL; Future< Integer > add( int arg1, int arg2, Callback< Integer > callback) throws IOException; } If the user wants to use only the Future and not the Callback, the user can pass in a null for the Callback parameter.
        Hide
        Scott Carey added a comment - - edited

        One other option, don't return a future. Instead a user can make one. This is more verbose however:

         public void add(int arg1, int arg2, Callback<Integer> cb) throws IOException; 
        

        Use a future:

          Future<Integer> f = new ProtocolFuture<Integer>();
          add(1, 2, f);
          Integer result = f.get();
        

        ProtocolFuture<T> is relatively simple, with pseudo-code like:

        public class ProtocolFuture<T> implements Future<T>, Callback<T> {
          private T result = null;
          private CountDownLatch latch = new CountDownLatch(1);
        
          @Override
          public final void handleResult(T result) {
            this.result = result;
            latch.countDown();
          }
        
          @Override
          public void T get() throws InterruptedException {
            latch.await();
            return result;
          }
        }
        

        Although the above class isn't so useful for Jetty, it could potentially be for other protocol implementations.

        For very high throughput, creating an unused Future with each call might be a small problem too.

        I'm not convinced I like this solution better, it is an interesting alternative though.

        Show
        Scott Carey added a comment - - edited One other option, don't return a future. Instead a user can make one. This is more verbose however: public void add( int arg1, int arg2, Callback< Integer > cb) throws IOException; Use a future: Future< Integer > f = new ProtocolFuture< Integer >(); add(1, 2, f); Integer result = f.get(); ProtocolFuture<T> is relatively simple, with pseudo-code like: public class ProtocolFuture<T> implements Future<T>, Callback<T> { private T result = null ; private CountDownLatch latch = new CountDownLatch(1); @Override public final void handleResult(T result) { this .result = result; latch.countDown(); } @Override public void T get() throws InterruptedException { latch.await(); return result; } } Although the above class isn't so useful for Jetty, it could potentially be for other protocol implementations. For very high throughput, creating an unused Future with each call might be a small problem too. I'm not convinced I like this solution better, it is an interesting alternative though.
        Hide
        James Baldassari added a comment -

        That is an interesting idea, Scott. I think it's a good option for a couple of reasons. We don't have to return a Future with every RPC call, which is likely to be ignored in most cases. It also makes the generated interfaces cleaner and less confusing for someone trying to figure out how to use it for the first time.

        We would need to make a minor change to ProtocolFuture to override handleError(Exception e) and throw that Exception inside get() if an error is returned by the callback. Also, ProtocolFuture could simply wrap a CallFuture<T>. Something like this:

        public class ProtocolFuture<T> implements Future<T>, Callback<T> {
          private final CallFuture<T> future = new CallFuture<T>();
        
          @Override
          public final void handleResult(T result) {
            future.setResponse(result);
          }
        
          @Override
          public final void handleError(Exception e) {
            future.setError(e);
          }
        
          @Override
          public final T get() throws InterruptedException, ExecutionException {
            return future.get();
          }
        
          @Override
          public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return future.get(timeout, unit);
          }
        }
        

        After thinking about it for a bit, I really like this approach. The interface is clean, and you can still use either Callbacks or Futures. Any other opinions?

        Show
        James Baldassari added a comment - That is an interesting idea, Scott. I think it's a good option for a couple of reasons. We don't have to return a Future with every RPC call, which is likely to be ignored in most cases. It also makes the generated interfaces cleaner and less confusing for someone trying to figure out how to use it for the first time. We would need to make a minor change to ProtocolFuture to override handleError(Exception e) and throw that Exception inside get() if an error is returned by the callback. Also, ProtocolFuture could simply wrap a CallFuture<T>. Something like this: public class ProtocolFuture<T> implements Future<T>, Callback<T> { private final CallFuture<T> future = new CallFuture<T>(); @Override public final void handleResult(T result) { future .setResponse(result); } @Override public final void handleError(Exception e) { future .setError(e); } @Override public final T get() throws InterruptedException, ExecutionException { return future .get(); } @Override public T get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return future .get(timeout, unit); } } After thinking about it for a bit, I really like this approach. The interface is clean, and you can still use either Callbacks or Futures. Any other opinions?
        Hide
        Scott Carey added a comment - - edited

        I think CallFuture needs to use a CountDownLatch and not a Semaphore so that two consecutive calls to get() don't cause it to block forever. Or , if there is a race condition between retrieval and setting the result I think the semaphore may block.

        The Future interface defines get() as the following in the javadoc:

        V get()
              throws InterruptedException,
                     ExecutionException
        
            Waits if necessary for the computation to complete, and then retrieves its result.
        
            Returns:
                the computed result 
            Throws:
                CancellationException - if the computation was cancelled 
                ExecutionException - if the computation threw an exception 
                InterruptedException - if the current thread was interrupted while waiting
        
        

        I interpret that to mean that it only blocks if necessary – if the result has not yet been returned. That behavior can be achieved with a CountDownLatch, so that it blocks only until the count reaches 0. Additionally, isDone() will not return true if get() is called after the result is set in the current implementation, which does not comply with the Future interface semantics. Using a CountDownLatch also simplifies the code.

        Perhaps we should have CallFuture implement Callback<T>, replacing setResponse and setError with handleResponse and handleError? Do we want to expose CallFuture in the public API?

        Show
        Scott Carey added a comment - - edited I think CallFuture needs to use a CountDownLatch and not a Semaphore so that two consecutive calls to get() don't cause it to block forever. Or , if there is a race condition between retrieval and setting the result I think the semaphore may block. The Future interface defines get() as the following in the javadoc: V get() throws InterruptedException, ExecutionException Waits if necessary for the computation to complete, and then retrieves its result. Returns: the computed result Throws: CancellationException - if the computation was cancelled ExecutionException - if the computation threw an exception InterruptedException - if the current thread was interrupted while waiting I interpret that to mean that it only blocks if necessary – if the result has not yet been returned. That behavior can be achieved with a CountDownLatch, so that it blocks only until the count reaches 0. Additionally, isDone() will not return true if get() is called after the result is set in the current implementation, which does not comply with the Future interface semantics. Using a CountDownLatch also simplifies the code. Perhaps we should have CallFuture implement Callback<T>, replacing setResponse and setError with handleResponse and handleError? Do we want to expose CallFuture in the public API?
        Hide
        James Baldassari added a comment -

        Good catch, Scott. It looks like CallFuture has been that way from the beginning. I think you're right that it would be better to use a CountDownLatch in this case.

        I like your idea of having CallFuture implement Callback. That would be a good reuse of code, and it would make it easier to implement the synchronous API using the callback API. I'll try it out this weekend and see how it looks.

        Show
        James Baldassari added a comment - Good catch, Scott. It looks like CallFuture has been that way from the beginning. I think you're right that it would be better to use a CountDownLatch in this case. I like your idea of having CallFuture implement Callback. That would be a good reuse of code, and it would make it easier to implement the synchronous API using the callback API. I'll try it out this weekend and see how it looks.
        Hide
        James Baldassari added a comment -

        I'm just about ready to post a new version of the patch incorporating the ideas we've discussed here. I did run into one problem that I wanted to run by you guys. I was trying to verify that all the various handshake scenarios worked to make sure that I didn't introduce any regressions. I found that there was already a nice unit test for this, TestProtocolSpecific, in particular the testParamVariation() test. However, there didn't appear to be a version of this test for Netty, so I made one by trivially extending TestProtocolSpecific like this:

        public class TestProtocolNetty extends TestProtocolSpecific {
          @Override
          public Server createServer(Responder testResponder) throws Exception {
            return new NettyServer(responder, new InetSocketAddress(0));
          }
          @Override
          public Transceiver createTransceiver() throws Exception{
            return new NettyTransceiver(new InetSocketAddress(server.getPort()));
          } 
          @Override
          protected int getExpectedHandshakeCount() {
            return REPEATING;
          }
        }
        

        When I ran that unit test, the testParamVariation() case hung. I couldn't figure out what was wrong at first, so I decided to run that same test against a clean checkout of trunk which didn't have any of my changes. It failed with a NPE when writing out data to the Netty channel. So I started thinking this might be an existing bug. When I dug deeper I figured out basically what was going on. In NettyServer.NettyServerAvroHandler, around line 145, the server closes the Netty channel if the handshake is not complete:

        } finally {
          if(!connectionMetadata.isConnected()) {
            e.getChannel().close();
          }
        }
        

        So I believe what's happening is that the client performs the initial handshake, the server detects that the protocols aren't exactly the same, and then closes the channel. However, the client (NettyTransceiver) doesn't ever re-establish the connection after the server closes it. So the client attempts to write the second part of the handshake to the Netty channel, but the channel has been closed, so the server never receives the handshake and never sends its reply. This is why the test was hanging for me while attempting to read from the channel (the client was waiting for a reply that never came). When I commented out the "e.getChannel().close()" line in NettyServer the test passed both against a clean trunk and with my patch. I was hoping someone could shed some light on why the server is closing the channel here. Is this really necessary?

        Show
        James Baldassari added a comment - I'm just about ready to post a new version of the patch incorporating the ideas we've discussed here. I did run into one problem that I wanted to run by you guys. I was trying to verify that all the various handshake scenarios worked to make sure that I didn't introduce any regressions. I found that there was already a nice unit test for this, TestProtocolSpecific, in particular the testParamVariation() test. However, there didn't appear to be a version of this test for Netty, so I made one by trivially extending TestProtocolSpecific like this: public class TestProtocolNetty extends TestProtocolSpecific { @Override public Server createServer(Responder testResponder) throws Exception { return new NettyServer(responder, new InetSocketAddress(0)); } @Override public Transceiver createTransceiver() throws Exception{ return new NettyTransceiver( new InetSocketAddress(server.getPort())); } @Override protected int getExpectedHandshakeCount() { return REPEATING; } } When I ran that unit test, the testParamVariation() case hung. I couldn't figure out what was wrong at first, so I decided to run that same test against a clean checkout of trunk which didn't have any of my changes. It failed with a NPE when writing out data to the Netty channel. So I started thinking this might be an existing bug. When I dug deeper I figured out basically what was going on. In NettyServer.NettyServerAvroHandler, around line 145, the server closes the Netty channel if the handshake is not complete: } finally { if (!connectionMetadata.isConnected()) { e.getChannel().close(); } } So I believe what's happening is that the client performs the initial handshake, the server detects that the protocols aren't exactly the same, and then closes the channel. However, the client (NettyTransceiver) doesn't ever re-establish the connection after the server closes it. So the client attempts to write the second part of the handshake to the Netty channel, but the channel has been closed, so the server never receives the handshake and never sends its reply. This is why the test was hanging for me while attempting to read from the channel (the client was waiting for a reply that never came). When I commented out the "e.getChannel().close()" line in NettyServer the test passed both against a clean trunk and with my patch. I was hoping someone could shed some light on why the server is closing the channel here. Is this really necessary?
        Hide
        James Baldassari added a comment -

        I just finished a new version of the patch that I think addresses all the issues we've been discussing here so far. Here is a summary of the changes since the first patch:

        • Removed "Asynchronous" from all method names. The new methods are just overloaded with an additional Callback parameter now.
        • Removed the CallFuture from the new generated client interfaces. These methods simply take an additional Callback<T> parameter and return void. For example:
          • void add(int arg1, int arg2, Callback<Integer>) throws IOException;
        • Modified CallFuture
          • Replaced the Semaphore with a CountDownLatch
          • Made CallFuture implement Callback, and replaced the set(Result|Error) methods with handle(Result|Error)
        • If clients want to use a Future-based API, just create a new CallFuture<T> and pass it in as the Callback<T> parameter, then wait on the CallFuture.
        • The SpecificCompiler takes a new boolean argument, called generateClientInterface, that controls whether the client-specific interfaces with the Callback methods will be generated. The Maven and Ant plugins also accept this parameter. The default value is currently set to true, but it's easy enough to default it to false if that is preferred.
        • The default implementation of Transceiver.transceive(List<ByteBuffer>, TransceiverCallback) now works synchronously rather than throwing UnsupportedOperationException. This allows the Callback-enabled interfaces to be used with non-Netty Transceivers.
        • Requestor.request(String, Object) is now implemented using the Callback API. This eliminates duplication of code and functionality between the synchronous and callback APIs.
        • New unit test, TestProtocolNetty, which runs all TestProtocolSpecific tests using NettyServer and NettyTransceiver. In particular, this verifies that the two-step protocol handshake works when using the Callback API (since the synchronous API is implemented using the Callback API)
        • Merged in changes from AVRO-832

        I think that's about it. Please take a look when you get a chance, and let me know what you think.

        Show
        James Baldassari added a comment - I just finished a new version of the patch that I think addresses all the issues we've been discussing here so far. Here is a summary of the changes since the first patch: Removed "Asynchronous" from all method names. The new methods are just overloaded with an additional Callback parameter now. Removed the CallFuture from the new generated client interfaces. These methods simply take an additional Callback<T> parameter and return void. For example: void add(int arg1, int arg2, Callback<Integer>) throws IOException; Modified CallFuture Replaced the Semaphore with a CountDownLatch Made CallFuture implement Callback, and replaced the set(Result|Error) methods with handle(Result|Error) If clients want to use a Future-based API, just create a new CallFuture<T> and pass it in as the Callback<T> parameter, then wait on the CallFuture. The SpecificCompiler takes a new boolean argument, called generateClientInterface, that controls whether the client-specific interfaces with the Callback methods will be generated. The Maven and Ant plugins also accept this parameter. The default value is currently set to true, but it's easy enough to default it to false if that is preferred. The default implementation of Transceiver.transceive(List<ByteBuffer>, TransceiverCallback) now works synchronously rather than throwing UnsupportedOperationException. This allows the Callback-enabled interfaces to be used with non-Netty Transceivers. Requestor.request(String, Object) is now implemented using the Callback API. This eliminates duplication of code and functionality between the synchronous and callback APIs. New unit test, TestProtocolNetty, which runs all TestProtocolSpecific tests using NettyServer and NettyTransceiver. In particular, this verifies that the two-step protocol handshake works when using the Callback API (since the synchronous API is implemented using the Callback API) Merged in changes from AVRO-832 I think that's about it. Please take a look when you get a chance, and let me know what you think.
        Hide
        James Baldassari added a comment -

        Version 2 of the 539 patch

        Show
        James Baldassari added a comment - Version 2 of the 539 patch
        Hide
        James Baldassari added a comment -

        Please disregard AVRO-539-v2.patch. I accidentally made that one from the lang/java directory instead of the trunk root. This one should apply cleanly to trunk.

        Show
        James Baldassari added a comment - Please disregard AVRO-539 -v2.patch. I accidentally made that one from the lang/java directory instead of the trunk root. This one should apply cleanly to trunk.
        Hide
        Doug Cutting added a comment -

        Looking good! A few comments and questions:

        • the flag might better be named 'generateCallbacks' than 'generateClientInterface'.
        • can we make this flag optional? it would be nice if we don't incompatibly change so many public methods and the command line interface.
        • maybe instead of generating another file, we could generate a nested interface? e.g.,
          public interface Foo {
            int x();
            public interface Callbacks extends Foo {
              void x(Callback<Integer> callback);
            }
          }
        • i don't think we need guards around log.debug statements that are passed a constant string. and if the log strings are constructing then it's efficient with slf4j to use a format string, e.g., log.debug("Error in x: {}", e);
        • can Request, Response and RequestorTranscieverCallback be private or package-private?

        Also, I will be mostly offline for the next week and may not be able to review patches very quickly.

        Show
        Doug Cutting added a comment - Looking good! A few comments and questions: the flag might better be named 'generateCallbacks' than 'generateClientInterface'. can we make this flag optional? it would be nice if we don't incompatibly change so many public methods and the command line interface. maybe instead of generating another file, we could generate a nested interface? e.g., public interface Foo { int x(); public interface Callbacks extends Foo { void x(Callback< Integer > callback); } } i don't think we need guards around log.debug statements that are passed a constant string. and if the log strings are constructing then it's efficient with slf4j to use a format string, e.g., log.debug("Error in x: {}", e); can Request, Response and RequestorTranscieverCallback be private or package-private? Also, I will be mostly offline for the next week and may not be able to review patches very quickly.
        Hide
        James Baldassari added a comment -

        Thanks for looking it over!

        the flag might better be named 'generateCallbacks' than 'generateClientInterface'.

        OK, easy change.

        can we make this flag optional?

        I believe it's optional in the mojo and in the Ant plugin. It should default to true if not specified. Just in case anyone is calling SpecificCompiler directly, I'll add overloaded methods that will match the old signatures (without the generateCallbacks boolean) and will default generateCallbacks to true. This way SpecificCompiler should be totally backwards compatible.

        maybe instead of generating another file, we could generate a nested interface?

        Sure. I'll make that change. It's probably a little cleaner with just one generated file per protocol.

        i don't think we need guards around log.debug statements that are passed a constant string

        No problem. The isDebugEnabled checks were in there so that the full stack traces would be printed when debug mode enabled; otherwise only the exception message would be printed without the stack trace. It's just a pattern I've gotten used to working with high-throughput messaging in which a lot of errors can be written in a short amount of time. I'll change these statements to be consistent with the Avro logging conventions.

        can Request, Response and RequestorTranscieverCallback be private or package-private?

        Request and Response certainly can, so I'll change them. TransceiverCallback can be made package-private, but then we should also make Transceiver.transceive(List<ByteBuffer>, TransceiverCallback) package private. Since Transceiver.transceive(List<ByteBuffer>) is public, I thought the one that takes a Callback should also be public. Then again, I don't think any of those transceiver methods are called from outside the package. Your call.

        Also, I will be mostly offline for the next week and may not be able to review patches very quickly.

        That's fine. I'm swamped at work right now, so I don't think I'll have time to work on these changes until the weekend anyway.

        Show
        James Baldassari added a comment - Thanks for looking it over! the flag might better be named 'generateCallbacks' than 'generateClientInterface'. OK, easy change. can we make this flag optional? I believe it's optional in the mojo and in the Ant plugin. It should default to true if not specified. Just in case anyone is calling SpecificCompiler directly, I'll add overloaded methods that will match the old signatures (without the generateCallbacks boolean) and will default generateCallbacks to true. This way SpecificCompiler should be totally backwards compatible. maybe instead of generating another file, we could generate a nested interface? Sure. I'll make that change. It's probably a little cleaner with just one generated file per protocol. i don't think we need guards around log.debug statements that are passed a constant string No problem. The isDebugEnabled checks were in there so that the full stack traces would be printed when debug mode enabled; otherwise only the exception message would be printed without the stack trace. It's just a pattern I've gotten used to working with high-throughput messaging in which a lot of errors can be written in a short amount of time. I'll change these statements to be consistent with the Avro logging conventions. can Request, Response and RequestorTranscieverCallback be private or package-private? Request and Response certainly can, so I'll change them. TransceiverCallback can be made package-private, but then we should also make Transceiver.transceive(List<ByteBuffer>, TransceiverCallback) package private. Since Transceiver.transceive(List<ByteBuffer>) is public, I thought the one that takes a Callback should also be public. Then again, I don't think any of those transceiver methods are called from outside the package. Your call. Also, I will be mostly offline for the next week and may not be able to review patches very quickly. That's fine. I'm swamped at work right now, so I don't think I'll have time to work on these changes until the weekend anyway.
        Hide
        Doug Cutting added a comment -

        I'm now leaning towards simply eliminating the flag for callback API generation and always generating the nested interface that includes this. Would anyone object to that?

        As for TranscieverCallback, you're right, it probably should remain public, or perhaps just replaced with Callback<List<ByteBuffer>>.

        Show
        Doug Cutting added a comment - I'm now leaning towards simply eliminating the flag for callback API generation and always generating the nested interface that includes this. Would anyone object to that? As for TranscieverCallback, you're right, it probably should remain public, or perhaps just replaced with Callback<List<ByteBuffer>>.
        Hide
        James Baldassari added a comment -

        If no objections to always generating the Callback API, I'll revert my changes to the maven and ant plugins.

        TransceiverCallback doesn't really add anything, so it's probably easier to just change it to Callback<List<ByteBuffer>>.

        Show
        James Baldassari added a comment - If no objections to always generating the Callback API, I'll revert my changes to the maven and ant plugins. TransceiverCallback doesn't really add anything, so it's probably easier to just change it to Callback<List<ByteBuffer>>.
        Hide
        James Baldassari added a comment -

        Here's a new version of the patch that addresses the latest round of comments. The main changes are:

        1. Reverted changes to SpecificCompiler and the maven/ant plugins because we'll always generate the callback API.
        2. Moved callback API to in an inner class inside the generated protocol interface. I named this sub-interface Callback. I was also considering other names like WithCallbacks, e.g. Calculator.WithCallbacks. Any preferences?
        Show
        James Baldassari added a comment - Here's a new version of the patch that addresses the latest round of comments. The main changes are: Reverted changes to SpecificCompiler and the maven/ant plugins because we'll always generate the callback API. Moved callback API to in an inner class inside the generated protocol interface. I named this sub-interface Callback. I was also considering other names like WithCallbacks, e.g. Calculator.WithCallbacks. Any preferences?
        Hide
        Doug Cutting added a comment -

        Looks great. I applied this. Tests pass.

        I have only a few cosmetic concerns now before I am ready to commit this.

        • There are a number of whitespace-only changes and reordering of imports that are not required. Can you please revert these?
        • Requestor#request(Request, Callback) should be private or package-private, no?

        Thanks!

        Show
        Doug Cutting added a comment - Looks great. I applied this. Tests pass. I have only a few cosmetic concerns now before I am ready to commit this. There are a number of whitespace-only changes and reordering of imports that are not required. Can you please revert these? Requestor#request(Request, Callback) should be private or package-private, no? Thanks!
        Hide
        James Baldassari added a comment -

        Thanks, Doug. I'll clean it up and submit a new patch soon.

        Show
        James Baldassari added a comment - Thanks, Doug. I'll clean it up and submit a new patch soon.
        Hide
        James Baldassari added a comment -

        Here's a new patch. I fixed a bunch of issues with whitespace and imports, and I changed the visibility of Requestor#request(Request, Callback). I also removed the TransceiverCallback class. I thought I had done that last time, but it was still there. Please let me know if I missed anything.

        Show
        James Baldassari added a comment - Here's a new patch. I fixed a bunch of issues with whitespace and imports, and I changed the visibility of Requestor#request(Request, Callback). I also removed the TransceiverCallback class. I thought I had done that last time, but it was still there. Please let me know if I missed anything.
        Hide
        Doug Cutting added a comment -

        I committed this. Thanks, James!

        I made a few formatting changes, mostly to wrap lines longer than 80 columns.

        I've only committed this to trunk so far, not (yet) to the 1.5 branch. Are we confident enough that this is back-compatible that we should merge it to the 1.5 branch, including it in the 1.5.2 release?

        Show
        Doug Cutting added a comment - I committed this. Thanks, James! I made a few formatting changes, mostly to wrap lines longer than 80 columns. I've only committed this to trunk so far, not (yet) to the 1.5 branch. Are we confident enough that this is back-compatible that we should merge it to the 1.5 branch, including it in the 1.5.2 release?
        Hide
        James Baldassari added a comment -

        Thanks Doug! And thanks to everyone who helped me out on this one.

        I think it would be compatible with 1.5, although I haven't tested that or checked if the patch will apply cleanly to the 1.5 branch. Do you want me to try it out? Also, is there a load/longevity test that people usually run to really put this stuff through its paces? For example, start up an echo server and hammer it with RPCs for 24 hours.

        Show
        James Baldassari added a comment - Thanks Doug! And thanks to everyone who helped me out on this one. I think it would be compatible with 1.5, although I haven't tested that or checked if the patch will apply cleanly to the 1.5 branch. Do you want me to try it out? Also, is there a load/longevity test that people usually run to really put this stuff through its paces? For example, start up an echo server and hammer it with RPCs for 24 hours.
        Hide
        Doug Cutting added a comment -

        There isn't load tester that I'm aware of. I think the patch will apply cleanly w/o any work. So maybe we should just go for it, since it passes all of the unit tests w/o any changes to them?

        Show
        Doug Cutting added a comment - There isn't load tester that I'm aware of. I think the patch will apply cleanly w/o any work. So maybe we should just go for it, since it passes all of the unit tests w/o any changes to them?
        Hide
        James Baldassari added a comment -

        Regarding the 1.5 back-port, I think it would be fine to commit it.

        Is there documentation somewhere, like on the Avro website or wiki, that would need to be updated to demonstrate the new callback functionality?

        On the topic of load testing, I think it would be useful to have some sort of utility for testing RPCs that goes beyond what the unit tests do. For example, this test/utility would use a thread pool on the client side and would invoke RPCs as quickly as possible against a test server. This type of test might uncover problems like ConcurrentModificationException and other thread-safety issues, memory leaks, and anything else that might go wrong under load. A simple version of this test could probably be added to the existing ipc project, perhaps bound to the integration-test phase rather than the test phase. The default run time for the test could be short, but the test time and number of client threads could be overridden via system properties to run a longer test if desired. Should I create an enhancement issue for this?

        Show
        James Baldassari added a comment - Regarding the 1.5 back-port, I think it would be fine to commit it. Is there documentation somewhere, like on the Avro website or wiki, that would need to be updated to demonstrate the new callback functionality? On the topic of load testing, I think it would be useful to have some sort of utility for testing RPCs that goes beyond what the unit tests do. For example, this test/utility would use a thread pool on the client side and would invoke RPCs as quickly as possible against a test server. This type of test might uncover problems like ConcurrentModificationException and other thread-safety issues, memory leaks, and anything else that might go wrong under load. A simple version of this test could probably be added to the existing ipc project, perhaps bound to the integration-test phase rather than the test phase. The default run time for the test could be short, but the test time and number of client threads could be overridden via system properties to run a longer test if desired. Should I create an enhancement issue for this?
        Hide
        Doug Cutting added a comment -

        Okay, I merged this to the 1.5 branch for inclusion in 1.5.2. It depended on AVRO-815, so I merged that too, which we probably should have done before anyway, as it was a compatible bugfix.

        Show
        Doug Cutting added a comment - Okay, I merged this to the 1.5 branch for inclusion in 1.5.2. It depended on AVRO-815 , so I merged that too, which we probably should have done before anyway, as it was a compatible bugfix.
        Hide
        Scott Carey added a comment -

        Is there documentation somewhere, like on the Avro website or wiki, that would need to be updated to demonstrate the new callback functionality?

        I am not aware of any. It would be great to have some documentation on the wiki about RPC and callbacks.

        On the topic of load testing, I think it would be useful to have some sort of utility for testing RPCs that goes beyond what the unit tests do.

        Absolutely. It would be useful to have load testing facilities for RPC. We have a crude load test (Perf.java) for Java serialization/deserialization. It has helped us identify places to optimize and found regressions. It runs manually, not bound to any maven phase or automatic test suite. We have discussed using Apache's resources to run a daily performance report or something similar.

        Likewise, we are lacking in cross-language performance tests.

        Show
        Scott Carey added a comment - Is there documentation somewhere, like on the Avro website or wiki, that would need to be updated to demonstrate the new callback functionality? I am not aware of any. It would be great to have some documentation on the wiki about RPC and callbacks. On the topic of load testing, I think it would be useful to have some sort of utility for testing RPCs that goes beyond what the unit tests do. Absolutely. It would be useful to have load testing facilities for RPC. We have a crude load test (Perf.java) for Java serialization/deserialization. It has helped us identify places to optimize and found regressions. It runs manually, not bound to any maven phase or automatic test suite. We have discussed using Apache's resources to run a daily performance report or something similar. Likewise, we are lacking in cross-language performance tests.

          People

          • Assignee:
            James Baldassari
            Reporter:
            Jeff Hammerbacher
          • Votes:
            2 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development