Index: src/test/java/org/apache/activemq/transport/stomp/StompTest.java
===================================================================
--- src/test/java/org/apache/activemq/transport/stomp/StompTest.java	(revision 583118)
+++ src/test/java/org/apache/activemq/transport/stomp/StompTest.java	(working copy)
@@ -35,49 +35,49 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.reliable.UnreliableUdpTransportTest;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.SimpleSecurityBrokerSystemTest;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class StompTest extends CombinationTestSupport {
     private static final Log LOG = LogFactory.getLog(StompTest.class);
 
-    protected String bindAddress = "stomp://localhost:0";
+    protected String bindAddress = "stomp://localhost:61613";
+    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
 
     private BrokerService broker;
-    private TransportConnector connector;
     private StompConnection stompConnection = new StompConnection();
     private Connection connection;
     private Session session;
     private ActiveMQQueue queue;
-
+    
     protected void setUp() throws Exception {
-        broker = new BrokerService();
-        broker.setPersistent(false);
-
-        connector = broker.addConnector(bindAddress);
+        broker = BrokerFactory.createBroker(new URI(confUri));
         broker.start();
 
         stompConnect();
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        connection = cf.createConnection();
+        connection = cf.createConnection("system", "manager");
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         queue = new ActiveMQQueue(getQueueName());
         connection.start();
     }
 
     private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
-        URI connectUri = connector.getConnectUri();
-        stompConnection.open("127.0.0.1", connectUri.getPort());
+        URI connectUri = new URI(bindAddress);
+        stompConnection.open(createSocket(connectUri));
     }
 
     protected Socket createSocket(URI connectUri) throws IOException {
-        return new Socket();
+        return new Socket("127.0.0.1", connectUri.getPort());
     }
 
     protected String getQueueName() {
@@ -117,7 +117,7 @@
 
     public void testConnect() throws Exception {
 
-        String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+        String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
         stompConnection.sendFrame(connectFrame);
 
         String f = stompConnection.receiveFrame();
@@ -130,7 +130,7 @@
 
         MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -155,7 +155,7 @@
 
         MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -174,7 +174,7 @@
 
         MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -195,7 +195,7 @@
 
         MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -222,7 +222,7 @@
 
     public void testSubscribeWithAutoAck() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -242,7 +242,7 @@
 
     public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -271,7 +271,7 @@
 
     public void testSubscribeWithMessageSentWithProperties() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -305,7 +305,7 @@
         int ctr = 10;
         String[] data = new String[ctr];
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -343,7 +343,7 @@
 
     public void testSubscribeWithAutoAckAndSelector() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -365,7 +365,7 @@
 
     public void testSubscribeWithClientAck() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
@@ -389,7 +389,7 @@
 
     public void testUnsubscribe() throws Exception {
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
@@ -426,7 +426,7 @@
     public void testTransactionCommit() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         String f = stompConnection.receiveFrame();
@@ -450,7 +450,7 @@
     public void testTransactionRollback() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         String f = stompConnection.receiveFrame();
@@ -486,7 +486,7 @@
 
     public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
         assertClients(1);
-        String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
 
         stompConnection.sendFrame(frame);
 
@@ -502,7 +502,61 @@
 
         assertClients(1);
     }
+    
+    public void testConnectNotAuthenticatedWrongUser() throws Exception {
+        String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
 
+        String f = stompConnection.receiveFrame();
+        
+        assertTrue(f.startsWith("ERROR"));  
+        assertClients(1);
+        
+    }
+    
+    public void testConnectNotAuthenticatedWrongPassword() throws Exception {
+        
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        String f = stompConnection.receiveFrame();
+        
+        assertTrue(f.startsWith("ERROR"));  
+        assertClients(1);        
+    }    
+    
+    public void testSendNotAuthorized() throws Exception {
+
+        String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("ERROR"));
+
+    }
+    
+    public void testSubscribeNotAuthorized() throws Exception {
+
+        String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("ERROR"));
+
+    } 
+    
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;
Index: src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
===================================================================
--- src/test/java/org/apache/activemq/transport/stomp/StompConnection.java	(revision 583118)
+++ src/test/java/org/apache/activemq/transport/stomp/StompConnection.java	(working copy)
@@ -30,9 +30,9 @@
 
     private Socket stompSocket;
     private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
-
-    public void open(String host, int port) throws IOException, UnknownHostException {
-        stompSocket = new Socket(host, port);
+    
+    public void open(Socket socket) {
+    	stompSocket = socket;
     }
 
     public void close() throws IOException {
@@ -76,4 +76,12 @@
         }
     }
 
+	public Socket getStompSocket() {
+		return stompSocket;
+	}
+
+	public void setStompSocket(Socket stompSocket) {
+		this.stompSocket = stompSocket;
+	}
+
 }
Index: src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
===================================================================
--- src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java	(revision 604672)
+++ src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java	(working copy)
@@ -25,7 +25,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.Destination;
 import javax.jms.JMSException;
 
 import org.apache.activemq.command.ActiveMQDestination;
@@ -38,6 +37,7 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -96,19 +96,29 @@
 
     protected ResponseHandler createResponseHandler(StompFrame command) {
         final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-        // A response may not be needed.
-        if (receiptId != null) {
-            return new ResponseHandler() {
-                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-                    StompFrame sc = new StompFrame();
-                    sc.setAction(Stomp.Responses.RECEIPT);
-                    sc.setHeaders(new HashMap<String, String>(1));
-                    sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-                    transportFilter.sendToStomp(sc);
-                }
-            };
-        }
-        return null;
+		return new ResponseHandler() {
+			public void onResponse(ProtocolConverter converter,
+					Response response) throws IOException {
+				if (response.isException()) {
+					ExceptionResponse exception = (ExceptionResponse) response;
+					// apparently, other (non-fatal) exceptions are
+					// generated as well (see debug log)
+					if (exception.getException() instanceof SecurityException) {
+						handleException(new ProtocolException(exception
+								.getException().getMessage(), true), null);
+						return;
+					}
+				}
+				if (receiptId != null) {
+					StompFrame sc = new StompFrame();
+					sc.setAction(Stomp.Responses.RECEIPT);
+					sc.setHeaders(new HashMap<String, String>(1));
+					sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+							receiptId);
+					transportFilter.sendToStomp(sc);
+				}
+			}
+		};
     }
 
     protected void sendToActiveMQ(Command command, ResponseHandler handler) {
@@ -160,27 +170,32 @@
             }
 
         } catch (ProtocolException e) {
+        	handleException(e, command);
+        }
+    }
+    
+    protected void handleException(ProtocolException exception, StompFrame command) throws IOException {
+        // Let the stomp client know about any protocol errors.
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+        exception.printStackTrace(stream);
+        stream.close();
 
-            // Let the stomp client know about any protocol errors.
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-            e.printStackTrace(stream);
-            stream.close();
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
 
-            HashMap<String, String> headers = new HashMap<String, String>();
-            headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+        if (command != null) {
+        	final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        	if (receiptId != null) {
+        		headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+        	}
+        }
 
-            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if (receiptId != null) {
-                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-            }
+        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+        sendToStomp(errorMessage);
 
-            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
-            sendToStomp(errorMessage);
-
-            if (e.isFatal()) {
-                getTransportFilter().onException(e);
-            }
+        if (exception.isFatal()) {
+            getTransportFilter().onException(exception);
         }
     }
 
@@ -430,7 +445,16 @@
                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
                     public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-
+						if (response.isException()) {
+							ExceptionResponse exception = (ExceptionResponse) response;
+							// apparently, other (non-fatal) exceptions are
+							// generated as well (see debug log)
+							if (exception.getException() instanceof SecurityException) {
+								handleException(new ProtocolException(exception
+										.getException().getMessage(), true), null);
+								return;
+							}
+						}
                         connected.set(true);
                         HashMap<String, String> responseHeaders = new HashMap<String, String>();
 
Index: src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml
===================================================================
--- src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml	(revision 0)
+++ src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml	(revision 0)
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: example -->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker useJmx="false" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
+
+	<transportConnectors>
+		<transportConnector name="stomp"   uri="stomp://localhost:61613"/>
+	</transportConnectors>
+
+    <plugins>
+		<simpleAuthenticationPlugin>
+			<users>
+				<authenticationUser username="system" password="manager"
+					groups="users,admins"/>
+				<authenticationUser username="user" password="password"
+					groups="users"/>
+				<authenticationUser username="guest" password="password" groups="guests"/>
+			</users>
+		</simpleAuthenticationPlugin>
+
+
+      <!--  lets configure a destination based authorization mechanism -->
+      <authorizationPlugin>
+        <map>
+          <authorizationMap>
+            <authorizationEntries>
+              <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
+              <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
+              <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+              
+              <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
+              <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
+              <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+              
+              <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
+            </authorizationEntries>
+          </authorizationMap>
+        </map>
+      </authorizationPlugin>
+    </plugins>
+  </broker>
+
+</beans>
\ No newline at end of file
