Details
Description
If there are multi-client send the dispatch request to server , the provider server will send a fault message to client randomly.
This issue is caused by SourceDataBinding doesn't create a thread safe DataReader.
The server code:
import javax.xml.transform.Source; import javax.xml.ws.Endpoint; import javax.xml.ws.Provider; import javax.xml.ws.Service; import javax.xml.ws.ServiceMode; import javax.xml.ws.WebServiceProvider; import javax.xml.ws.http.HTTPBinding; public class Starter { @WebServiceProvider @ServiceMode(value = Service.Mode.MESSAGE) static class SimplePOXEchoProvider implements Provider<Source> { public Source invoke(Source request) { return request; } } public static void main(String[] args) throws InterruptedException { Endpoint ep = Endpoint.create(HTTPBinding.HTTP_BINDING, new SimplePOXEchoProvider()); ep.publish("http://localhost:9001/test"); while (true) { Thread.sleep(100000); } } }
The client code
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.xml.namespace.QName; import javax.xml.transform.stream.StreamSource; import javax.xml.ws.Dispatch; import javax.xml.ws.Service; import javax.xml.ws.http.HTTPBinding; import org.apache.commons.io.IOUtils; import org.apache.cxf.jaxws.DispatchImpl; import org.apache.log4j.Logger; import org.junit.Test; public class ServiceTests { private final static Logger log = Logger.getLogger(ServiceTests.class); private boolean failed = false; public static final String POX_REQUEST = "<ns2:sendIt xmlns:ns2=\"http://service.sandbox.paxtests.example.com/\">" + "<arg0>Hello</arg0></ns2:sendIt>"; @Test public void concurrentPositiveTest() throws InterruptedException { int requestsCount = 20; final CountDownLatch latch = new CountDownLatch(requestsCount); Runnable task = new Runnable() { public void run() { try { testPoxPositive(); } catch (Throwable t) { log.error("FAILED", t); failed = true; } finally { latch.countDown(); } } }; ExecutorService threadPool = Executors.newFixedThreadPool(10); for(int i=0; i<requestsCount; i++) { threadPool.execute(task); } assertTrue(latch.await(100000, TimeUnit.SECONDS)); assertFalse(failed); } //@Test public void testPoxPositive() throws IOException { QName portName = new QName("dummy", "dummy"); Service service = Service.create(null); service.addPort(portName, HTTPBinding.HTTP_BINDING, "http://localhost:9001/test"); Dispatch<StreamSource> disp = service.createDispatch(portName, StreamSource.class, Service.Mode.MESSAGE); StreamSource response = disp.invoke(new StreamSource(IOUtils.toInputStream(POX_REQUEST))); String text = IOUtils.toString(response.getInputStream()); assertEquals(POX_REQUEST, text); ((DispatchImpl<StreamSource>)disp).getClient().destroy(); }
When send the request with thread pool size 20, you will get the below stack trace.
[pool-1-thread-2] 2011-02-10 15:25:39,089 ERROR [com.example.test.ServiceTests] - FAILED javax.xml.ws.http.HTTPException at org.apache.cxf.jaxws.DispatchImpl.mapException(DispatchImpl.java:248) at org.apache.cxf.jaxws.DispatchImpl.invoke(DispatchImpl.java:339) at org.apache.cxf.jaxws.DispatchImpl.invoke(DispatchImpl.java:218) at com.example.test.ServiceTests.testPoxPositive(ServiceTests.java:66) at com.example.test.ServiceTests$1.run(ServiceTests.java:40) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) Caused by: org.apache.cxf.binding.xml.XMLFault: Message part {http://service.sandbox.paxtests.gateway.sabre.com/}sendIt was not recognized. (Does it exist in service WSDL?) at org.apache.cxf.binding.xml.interceptor.XMLFaultInInterceptor.handleMessage(XMLFaultInInterceptor.java:63) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:255) at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:99) at org.apache.cxf.binding.xml.interceptor.XMLMessageInInterceptor.handleMessage(XMLMessageInInterceptor.java:81) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:255) at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:755) at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:2335) at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponse(HTTPConduit.java:2193) at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.close(HTTPConduit.java:2037) at org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47) at org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188) at org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:56) at org.apache.cxf.transport.http.HTTPConduit.close(HTTPConduit.java:697) at org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:255) at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:516) at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:313) at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:265) at org.apache.cxf.endpoint.ClientImpl.invokeWrapped(ClientImpl.java:300) at org.apache.cxf.jaxws.DispatchImpl.invoke(DispatchImpl.java:332) ... 6 more