Uploaded image for project: 'CXF'
  1. CXF
  2. CXF-2982

Don't throw the SuspendedInvocationException when call the suspend() method of CXF continuation

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.3
    • Component/s: None
    • Labels:
      None

      Description

      Current CXF Continuation suspend implementation is based on throw SuspendedInvocationException, This implementation has a shortcoming which cannot call the other framework's async API after continuation suspend is called as Jetty7 does.
      So I introduce a flag of Message.CONTINUATION_SUSPENDED to break out current interceptor chain as the SuspendedInvocation does, but It will make CXF continuation API more easy to use and it supports the Jetty7 continuation or Servlet3 suspend resume semantics.

      1. cxf-2982.patch
        17 kB
        Willem Jiang

        Activity

        Hide
        njiang Willem Jiang added a comment -

        Applied patch with suggestion of DanKulp
        Here is the mail thread[1] about it.

        [1]http://cxf.547215.n5.nabble.com/CXF-continuation-enhancement-td2800561.html#a2800561

        Show
        njiang Willem Jiang added a comment - Applied patch with suggestion of DanKulp Here is the mail thread [1] about it. [1] http://cxf.547215.n5.nabble.com/CXF-continuation-enhancement-td2800561.html#a2800561
        Hide
        njiang Willem Jiang added a comment -

        Before apply the patch, my camel-cxf consumer is like this

                  private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
                        if (continuation.isNew()) {
                            final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
        
                            // use the asynchronous API to process the exchange
                            boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
                                public void done(boolean doneSync) {
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("Resuming continuation of exchangeId: "
                                                  + camelExchange.getExchangeId());
                                    }
                                    // resume processing after both, sync and async callbacks
                                    continuation.setObject(camelExchange);
                                    continuation.resume();
                                }
                            });
                            // just need to avoid the continuation.resume is called
                            // before the continuation.suspend is called
                            if (continuation.getObject() != camelExchange && !sync) {
                                // Now we don't set up the timeout value
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("Suspending continuation of exchangeId: "
                                              + camelExchange.getExchangeId());
                                }
                                // The continuation could be called before the suspend
                                // is called
                                continuation.suspend(0);
                            } else {
                                // just set the response back, as the invoking thread is
                                // not changed
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
                                }
                                setResponseBack(cxfExchange, camelExchange);
                            }
        
                        }
                        if (continuation.isResumed()) {
                            org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
                                .getObject();
                            setResponseBack(cxfExchange, camelExchange);
        
                        }
                        return null;
                    }
        

        After applying the patch , my camel-cxf consumer code is much clearer

                  private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
                        if (continuation.isNew()) {
                            final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
                            // Now we don't set up the timeout value
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Suspending continuation of exchangeId: "
                                          + camelExchange.getExchangeId());
                            }
                            // now we could call the suspend here
                            continuation.suspend(0);
                            
                            // use the asynchronous API to process the exchange
                            getAsyncProcessor().process(camelExchange, new AsyncCallback() {
                                public void done(boolean doneSync) {
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("Resuming continuation of exchangeId: "
                                                  + camelExchange.getExchangeId());
                                    }
                                    // resume processing after both, sync and async callbacks
                                    continuation.setObject(camelExchange);
                                    continuation.resume();
                                }
                            });
        
                        }
                        if (continuation.isResumed()) {
                            org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
                                .getObject();
                            setResponseBack(cxfExchange, camelExchange);
        
                        }
                        return null;
                    }
        
        Show
        njiang Willem Jiang added a comment - Before apply the patch, my camel-cxf consumer is like this private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { if (continuation.isNew()) { final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); // use the asynchronous API to process the exchange boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() { public void done( boolean doneSync) { if (LOG.isTraceEnabled()) { LOG.trace( "Resuming continuation of exchangeId: " + camelExchange.getExchangeId()); } // resume processing after both, sync and async callbacks continuation.setObject(camelExchange); continuation.resume(); } }); // just need to avoid the continuation.resume is called // before the continuation.suspend is called if (continuation.getObject() != camelExchange && !sync) { // Now we don't set up the timeout value if (LOG.isTraceEnabled()) { LOG.trace( "Suspending continuation of exchangeId: " + camelExchange.getExchangeId()); } // The continuation could be called before the suspend // is called continuation.suspend(0); } else { // just set the response back, as the invoking thread is // not changed if (LOG.isTraceEnabled()) { LOG.trace( "Processed the Exchange : " + camelExchange.getExchangeId()); } setResponseBack(cxfExchange, camelExchange); } } if (continuation.isResumed()) { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation .getObject(); setResponseBack(cxfExchange, camelExchange); } return null ; } After applying the patch , my camel-cxf consumer code is much clearer private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) { if (continuation.isNew()) { final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange); // Now we don't set up the timeout value if (LOG.isTraceEnabled()) { LOG.trace( "Suspending continuation of exchangeId: " + camelExchange.getExchangeId()); } // now we could call the suspend here continuation.suspend(0); // use the asynchronous API to process the exchange getAsyncProcessor().process(camelExchange, new AsyncCallback() { public void done( boolean doneSync) { if (LOG.isTraceEnabled()) { LOG.trace( "Resuming continuation of exchangeId: " + camelExchange.getExchangeId()); } // resume processing after both, sync and async callbacks continuation.setObject(camelExchange); continuation.resume(); } }); } if (continuation.isResumed()) { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation .getObject(); setResponseBack(cxfExchange, camelExchange); } return null ; }
        Hide
        njiang Willem Jiang added a comment -

        Attache the patch with new implementation.

        Show
        njiang Willem Jiang added a comment - Attache the patch with new implementation.

          People

          • Assignee:
            njiang Willem Jiang
            Reporter:
            njiang Willem Jiang
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development