Index: activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java	(revision 688723)
+++ activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java	(working copy)
@@ -49,6 +49,8 @@
         String RECEIPT_REQUESTED = "receipt";
         String TRANSACTION = "transaction";
         String CONTENT_LENGTH = "content-length";
+        String TRANSFORMATION = "transformation";
+        String TRANSFORMATION_ERROR = "transformation-error";
 
         public interface Response {
             String RECEIPT_ID = "receipt-id";
@@ -114,4 +116,16 @@
             String MESSAGE_ID = "message-id";
         }
     }
+    
+	public enum Transformations {
+		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
+		
+		public String toString() {
+			return name().replaceAll("_", "-").toLowerCase();
+		}
+		
+		public static Transformations getValue(String value) {
+			return valueOf(value.replaceAll("-", "_").toUpperCase());
+		}
+	}    
 }
Index: activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java	(revision 688723)
+++ activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java	(working copy)
@@ -24,7 +24,6 @@
 import java.util.Iterator;
 import java.util.Map;
 
-import javax.jms.Destination;
 import javax.jms.JMSException;
 
 import org.apache.activemq.command.ActiveMQBytesMessage;
@@ -32,10 +31,12 @@
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -49,6 +50,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -93,21 +95,27 @@
     	}
     }
 
-    protected ResponseHandler createResponseHandler(StompFrame command){
+    protected ResponseHandler createResponseHandler(final StompFrame command) {
         final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-        // A response may not be needed.
-        if( receiptId != null ) {
-	        return new ResponseHandler() {
-	    		public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-	                StompFrame sc = new StompFrame();
-	                sc.setAction(Stomp.Responses.RECEIPT);
-	                sc.setHeaders(new HashMap(1));
-	                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-	        		transportFilter.sendToStomp(sc);
-	    		}
-	        };
-	    }
-    	return null;
+        if (receiptId != null) {
+            return new ResponseHandler() {
+                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        // Generally a command can fail.. but that does not invalidate the connection.
+                        // We report back the failure but we don't close the connection.
+                        Throwable exception = ((ExceptionResponse)response).getException();
+                        handleException(exception, command);
+                    } else {
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.RECEIPT);
+                        sc.setHeaders(new HashMap<String, String>(1));
+                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                        transportFilter.sendToStomp(sc);
+                    }
+                }
+            };
+        }
+        return null;
     }
 
 	protected void sendToActiveMQ(Command command, ResponseHandler handler) {
@@ -123,63 +131,71 @@
 		transportFilter.sendToStomp(command);
 	}
 
-	/**
-     * Convert a stomp command
-     * @param command
-     */
-	public void onStompCommad( StompFrame command ) throws IOException, JMSException {
-		try {
+    /**
+	 * Convert a stomp command
+	 * 
+	 * @param command
+	 */
+    public void onStompCommad(StompFrame command) throws IOException, JMSException {
+        try {
 
-			if( command.getClass() == StompFrameError.class ) {
-				throw ((StompFrameError)command).getException();
-			}
+            if (command.getClass() == StompFrameError.class) {
+                throw ((StompFrameError)command).getException();
+            }
 
-			String action = command.getAction();
-	        if (action.startsWith(Stomp.Commands.SEND))
-	            onStompSend(command);
-	        else if (action.startsWith(Stomp.Commands.ACK))
-	            onStompAck(command);
-	        else if (action.startsWith(Stomp.Commands.BEGIN))
-	            onStompBegin(command);
-	        else if (action.startsWith(Stomp.Commands.COMMIT))
-	            onStompCommit(command);
-	        else if (action.startsWith(Stomp.Commands.ABORT))
-	            onStompAbort(command);
-	        else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
-	            onStompSubscribe(command);
-	        else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
-	            onStompUnsubscribe(command);
-			else if (action.startsWith(Stomp.Commands.CONNECT))
-	            onStompConnect(command);
-	        else if (action.startsWith(Stomp.Commands.DISCONNECT))
-	            onStompDisconnect(command);
-	        else
-	        	throw new ProtocolException("Unknown STOMP action: "+action);
+            String action = command.getAction();
+            if (action.startsWith(Stomp.Commands.SEND)) {
+                onStompSend(command);
+            } else if (action.startsWith(Stomp.Commands.ACK)) {
+                onStompAck(command);
+            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
+                onStompBegin(command);
+            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
+                onStompCommit(command);
+            } else if (action.startsWith(Stomp.Commands.ABORT)) {
+                onStompAbort(command);
+            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
+                onStompSubscribe(command);
+            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
+                onStompUnsubscribe(command);
+            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
+                onStompConnect(command);
+            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
+                onStompDisconnect(command);
+            } else {
+                throw new ProtocolException("Unknown STOMP action: " + action);
+            }
 
         } catch (ProtocolException e) {
-
-        	// Let the stomp client know about any protocol errors.
-        	ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        	PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
-        	e.printStackTrace(stream);
-        	stream.close();
-
-        	HashMap headers = new HashMap();
-        	headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
-            final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if( receiptId != null ) {
-            	headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            handleException(e, command);
+            // Some protocol errors can cause the connection to get closed.
+            if( e.isFatal() ) {
+               getTransportFilter().onException(e);
             }
+        }
+    }
+    
+    protected void handleException(Throwable exception, StompFrame command) throws IOException {
+        // Let the stomp client know about any protocol errors.
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+        exception.printStackTrace(stream);
+        stream.close();
 
-        	StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
-			sendToStomp(errorMessage);
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
 
-			if( e.isFatal() )
-				getTransportFilter().onException(e);
+        if (command != null) {
+        	final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        	if (receiptId != null) {
+        		headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+        	}
         }
-	}
 
+        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+        sendToStomp(errorMessage);
+    }
+
 	protected void onStompSend(StompFrame command) throws IOException, JMSException {
 		checkConnected();
 
@@ -352,9 +368,9 @@
 
 	}
 
-	protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
-		checkConnected();
-    	Map headers = command.getHeaders();
+    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map<String, String> headers = command.getHeaders();
 
         ActiveMQDestination destination=null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
@@ -385,65 +401,85 @@
         throw new ProtocolException("No subscription matched.");
 	}
 
-	protected void onStompConnect(StompFrame command) throws ProtocolException {
+    protected void onStompConnect(final StompFrame command) throws ProtocolException {
 
-		if(connected.get()) {
-			throw new ProtocolException("Allready connected.");
-		}
+        if (connected.get()) {
+            throw new ProtocolException("Allready connected.");
+        }
 
-    	final Map headers = command.getHeaders();
+        final Map<String, String> headers = command.getHeaders();
 
         // allow anyone to login for now
-        String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
-        String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
-        String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+        String login = headers.get(Stomp.Headers.Connect.LOGIN);
+        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
+        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
 
         final ConnectionInfo connectionInfo = new ConnectionInfo();
 
         IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
 
         connectionInfo.setConnectionId(connectionId);
-        if( clientId!=null )
+        if (clientId != null) {
             connectionInfo.setClientId(clientId);
-        else
-            connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+        } else {
+            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+        }
 
         connectionInfo.setResponseRequired(true);
         connectionInfo.setUserName(login);
         connectionInfo.setPassword(passcode);
 
-		sendToActiveMQ(connectionInfo, new ResponseHandler(){
-			public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
 
-	            final SessionInfo sessionInfo = new SessionInfo(sessionId);
-	            sendToActiveMQ(sessionInfo,null);
+                if (response.isException()) {
+                    // If the connection attempt fails we close the socket.
+                    Throwable exception = ((ExceptionResponse)response).getException();
+                    handleException(exception, command);
+                    getTransportFilter().onException(IOExceptionSupport.create(exception));
+                    return;
+                }
 
+                final SessionInfo sessionInfo = new SessionInfo(sessionId);
+                sendToActiveMQ(sessionInfo, null);
 
-	            final ProducerInfo producerInfo = new ProducerInfo(producerId);
-	            sendToActiveMQ(producerInfo,new ResponseHandler(){
-					public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                final ProducerInfo producerInfo = new ProducerInfo(producerId);
+                sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                        
+                        if (response.isException()) {
+                            // If the connection attempt fails we close the socket.
+                            Throwable exception = ((ExceptionResponse)response).getException();
+                            handleException(exception, command);
+                            getTransportFilter().onException(IOExceptionSupport.create(exception));
+                        }
+                        
+                        connected.set(true);
+                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
 
-						connected.set(true);
-	                    HashMap responseHeaders = new HashMap();
+                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
+                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
+                        if (requestId == null) {
+                            // TODO legacy
+                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
+                        }
+                        if (requestId != null) {
+                            // TODO legacy
+                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
+                        }
 
-	                    responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
-	                    String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
-	                    if( requestId !=null ){
-		                    responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
-	            		}
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.CONNECTED);
+                        sc.setHeaders(responseHeaders);
+                        sendToStomp(sc);
+                    }
+                });
 
-	                    StompFrame sc = new StompFrame();
-	                    sc.setAction(Stomp.Responses.CONNECTED);
-	                    sc.setHeaders(responseHeaders);
-	                    sendToStomp(sc);
-					}
-				});
+            }
+        });
+    }
 
-			}
-		});
-
-	}
-
 	protected void onStompDisconnect(StompFrame command) throws ProtocolException {
 		checkConnected();
 		sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
@@ -466,26 +502,33 @@
 
     	if ( command.isResponse() ) {
 
-			Response response = (Response) command;
-		    ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
-		    if( rh !=null ) {
-		    	rh.onResponse(this, response);
-		    }
+            Response response = (Response)command;
+            ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            } else {
+                // Pass down any unexpected errors. Should this close the connection?
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse)response).getException();
+                    handleException(exception, null);
+                }
+            }
+        } else if (command.isMessageDispatch()) {
 
-		} else if( command.isMessageDispatch() ) {
-
-		    MessageDispatch md = (MessageDispatch)command;
-		    StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
-		    if (sub != null) {
-		        sub.onMessageDispatch(md);
+            MessageDispatch md = (MessageDispatch)command;
+            StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
+            if (sub != null) {
+                sub.onMessageDispatch(md);
             }
+        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+            // Pass down any unexpected async errors. Should this close the connection?
+            Throwable exception = ((ConnectionError)command).getException();
+            handleException(exception, null);
         }
-	}
+    }
 
-    public  ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
-
+    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
         ActiveMQMessage msg = frameTranslator.convertFrame(command);
-
         return msg;
     }
 
Index: activemq-core/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/util/IOExceptionSupport.java	(revision 688723)
+++ activemq-core/src/main/java/org/apache/activemq/util/IOExceptionSupport.java	(working copy)
@@ -19,8 +19,11 @@
 
 import java.io.IOException;
 
-final public class IOExceptionSupport {
+public final class IOExceptionSupport {
 
+    private IOExceptionSupport() {
+    }
+
     public static IOException create(String msg, Throwable cause) {
         IOException exception = new IOException(msg);
         exception.initCause(cause);
