package lt.jurna.test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SlowConsumerTest extends TestCase{
	private static final Log log = LogFactory.getLog(SlowConsumerTest.class);
    private Socket stompSocket;
    private ByteArrayOutputStream inputBuffer;
    
    private static final int MESSAGES_COUNT = 10000;
    private int messagesCount;

	/**
	 * @param args
	 * @throws Exception 
	 */
	public void testRemoveSubscriber() throws Exception {
		final BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setUseJmx(true);

        broker.addConnector("tcp://0:61616").setName("Default");
        broker.start();
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		final Connection connection = factory.createConnection();
        connection.start();

        Thread producingThread = new Thread("Producing thread"){
        @Override
        		public void run() {
	        		try {
			    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			    		MessageProducer producer = session.createProducer(new ActiveMQQueue("test"));
			    		for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
				    		Message message = session.createTextMessage("" + idx);
			    			producer.send(message);
			    			log.debug("Sending: " +idx);
			    		}
			    		producer.close();
			    		session.close();
	        		} catch (Throwable ex) {
	        			ex.printStackTrace();
	        		}
	        }	
        };
        producingThread.setPriority(Thread.MAX_PRIORITY);
        producingThread.start();
        Thread.sleep(1000);

        Thread consumingThread = new Thread("Consuming thread"){
            @Override
            public void run() {
            		try {
	    		    		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
	    		    		MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("test"));
	    		    		int diff = 0;
	    		    		while (messagesCount != MESSAGES_COUNT) {
	    		    			Message msg = consumer.receive();
	    		    			if (msg == null) {
	    		    				Thread.sleep(100);
	    		    				log.debug("Got null message. Continuing...");
	    		    				continue;
	    		    			}
	    		    			String text = ((TextMessage)msg).getText();
	    		    			int currentMsgIdx = Integer.parseInt(text);
	    		    			log.debug("Received: " + text + " messageCount: " + messagesCount);
	    		    			msg.acknowledge();
	    		    			if ((messagesCount + diff) != currentMsgIdx) {
	    		    				log.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx);
	    		    				diff = currentMsgIdx - messagesCount;
	    		    			}
	    		    			++messagesCount;
    		                Thread.sleep(70);
	    		    		}
    	        		} catch (Throwable ex) {
    	        			ex.printStackTrace();
    	        		}
    	        }	
        };
        consumingThread.start();
        consumingThread.join();

        assertEquals(MESSAGES_COUNT, messagesCount);

	}
	
    public void sendFrame(String data) throws Exception {
        byte[] bytes = data.getBytes("UTF-8");
        OutputStream outputStream = stompSocket.getOutputStream();
        for (int i = 0; i < bytes.length; i++) {
            outputStream.write(bytes[i]);
        }
        outputStream.flush();
    }
    
    public String receiveFrame(long timeOut) throws Exception {
        stompSocket.setSoTimeout((int) timeOut);
        InputStream is = stompSocket.getInputStream();
        int c=0;
        for(;;) {
            c = is.read();
            if( c < 0 ) {
                throw new IOException("socket closed.");
            } else if( c == 0 ) {
                c = is.read();
                byte[] ba = inputBuffer.toByteArray();
                inputBuffer.reset();
                return new String(ba, "UTF-8");
            } else {
                inputBuffer.write(c);
            }
        } 
    }


}
