Details
Description
I am trying to get a serverless setup using multicast or JRMP as transport.
I verified that Multicast works in my environment using this util: http://www.mikkle.dk/multicasttest/index.html
But I am unable to get ActiveMQ to work between 2 JVM on the same box or on different boxes. Both producer and consumer runs without any error except that Consumer is not getting any message at all. Furthermore, no multicast traffic can be detected on the network.
I have tried activemq-1.0-M1.jar and activemq-1.0-SNAPSHOT.jar as listed on http://dist.codehaus.org/activemq/jars and also several interim builds.
Attached are my 2 simple test program. Note that I am able to get tcp protocol to work with these 2 program if I start a server.
::::::::::::::::::::::::::::::::::::
Producer tester
::::::::::::::::::::::::::::::::::::
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Reference;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.jndi.JNDIReferenceFactory;
/**
- Class to test ActiveMQ Publisher
* - @author $author$
- @version $Revision$
*/
public class TestActiveMQProducer
{
protected ConnectionFactory connFactory;
protected Connection sndConn;
protected Session sndSess;
protected MessageProducer producer;
protected Destination dest;
/**
- main logic
* - @param args .
* - @throws Exception !
*/
public static void main(String[] args) throws Exception
{
TestActiveMQProducer tester = new TestActiveMQProducer();
tester.setUp();
for (int i = 0; i < 100; i++)
{ Message msg = tester.sndSess.createTextMessage("Hello World!"); tester.producer.send(tester.dest, msg); System.out.println("Sent msg: " + msg); Thread.sleep(1000); } tester.tearDown();
}
protected void setUp() throws Exception
{ connFactory = new ActiveMQConnectionFactory("multicast://224.0.0.1:8000"); //connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); System.out.println("Created conn factory: " + connFactory); sndConn = connFactory.createConnection(); System.out.println("Created conn: " + sndConn); sndSess = sndConn.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("Created receiveSession: " + sndSess); dest = sndSess.createTopic("FOO"); System.out.println( "Created destination: " + dest + " of type: " + dest.getClass() ); producer = sndSess.createProducer(dest); sndConn.start(); System.out.println("Started connections"); }protected void tearDown() throws Exception
{ sndSess.close(); sndConn.close(); }}
::::::::::::::::::::::::::::::::::::
Consumer tester
::::::::::::::::::::::::::::::::::::
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Reference;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.jndi.JNDIReferenceFactory;
/**
- Class to test ActiveMQ Subscriber
* - @author $author$
- @version $Revision$
*/
public class TestActiveMQConsumer implements MessageListener
{
protected ConnectionFactory connFactory;
protected Connection rcvConn;
protected Session rcvSess;
protected MessageConsumer consumer;
protected Destination dest;
/**
- main logic
* - @param args .
* - @throws Exception !
*/
public static void main(String[] args) throws Exception { TestActiveMQConsumer tester = new TestActiveMQConsumer(); tester.setUp(); // tester.tearDown(); }
protected void setUp() throws Exception
{ connFactory = new ActiveMQConnectionFactory("multicast://224.0.0.1:8000"); //connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); System.out.println("Created conn factory: " + connFactory); rcvConn = connFactory.createConnection(); System.out.println("Created conn: " + rcvConn); rcvSess = rcvConn.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("Created receiveSession: " + rcvSess); dest = rcvSess.createTopic("FOO"); System.out.println( "Created destination: " + dest + " of type: " + dest.getClass() ); consumer = rcvSess.createConsumer(dest); consumer.setMessageListener(this); rcvConn.start(); System.out.println("Started connections"); }protected void tearDown() throws Exception
{ rcvSess.close(); rcvConn.close(); }/**
- Got a message!
* - @param message .
*/
public void onMessage(Message message) { System.out.println("Received message: " + message); }}