**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
/**
* Test the NIO transport with this Test group
*/
public class PahoMQTTNIOTest extends PahoMQTTTest implements MqttCallback {
AtomicInteger m_receiveCounter = new AtomicInteger();
String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
static ArrayList<MqttClient> mqttClients = null;
static final Integer staticSyncObj = new Integer(1);
String messagePayload = null;
public final int numberOfThreads = 500;
static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
boolean f_messageReceived = false;
boolean f_ackReceived = false;
boolean f_lost = false;
/**
*
*
* @param client
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(MqttClient client,
String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
boolean f_wasConnected = true;
if (client == null)
{
f_wasConnected = false;
client = new MqttClient(location, clientId/*, persistence*/);
}
if (!f_wasConnected)
{
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
client.connect(options);
client.setCallback(this);
}
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
client = null;
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion - "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exception e = "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
return null;
}
/**
*
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
MqttClient client = new MqttClient(location, clientId/*, persistence*/);
client.setCallback(this);
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.connect(options);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
assertTrue(false);
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exceptoin e = "+e.getMessage());
assertTrue(false);
}
return null;
}
/**
*
* @param input
* @param output
* @return
* @throws IOException
*/
public static long copyLarge(InputStream input, OutputStream output) throws IOException
{
byte[] buffer = new byte[4096];
long count = 0L;
int n = 0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return count;
}
/**
*
* @param p
* @throws IOException
*/
public static void outputResults (Process p) throws IOException
{
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
String s;
while ((s = stdInput.readLine()) != null)
{
System.out.println(s);
}
while ((s = stdError.readLine()) != null)
{
System.out.println(s);
}
}
/**
* HELPER
* @param msec
* @throws InterruptedException
*/
void pause_til_done_or_time(int msec) throws InterruptedException
{
int pauseTime = 100;
while (!f_messageReceived && msec > 0 && !f_lost)
{
Thread.sleep(pauseTime);
msec -= pauseTime;
}
}
static Integer numberOfMessages = 0;
public void clearMessageCount()
{
numberOfMessages = 0;
}
public Integer getMessageCount()
{
return numberOfMessages;
}
/**
*
* @param msec
*/
private void waitForItAck(int msec)
{
while (!f_ackReceived)
{
try { Thread.sleep(1000); } catch (Exception e){}
msec= msec-1000;
if (msec < 0)
{
break;
}
}
}
@Override
public String getProtocolScheme() {
return "mqtt+nio";
}
@Override
public boolean isUseSSL() {
return false;
}
public class PahoCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
m_receiveCounter.incrementAndGet();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
static MqttClient BalstTestClient = null;
String loc = "tcp://localhost:1883";
public class pubThreadBitMsg extends Thread {
public pubThreadBitMsg()
{
synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - constructor");
if (BalstTestClient == null)
{
BalstTestClient = pubNameSpace(loc, "cjutzi",
"someone",
"myclientid_cjutzi_pub",
"hello",
"/accounts/cjutzi/users/curt/test",
"Starting Client", 1, false, true);
System.out.println("---- pubTheadBitMsg - init");
}
}
catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void run()
{
// synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - send");
pubNameSpace(BalstTestClient, loc, "cjutzi",
"someone",
"myclientid_cjutzi_pub",
"hello",
"/accounts/cjutzi/users/curt/test",
BigMessage, 1, false, true);
} catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
@Test
public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
{
MqttClient subClient = new MqttClient("tcp://localhost:1883",
"niosubclient",
new MemoryPersistence());
MqttConnectOptions cOpts = new MqttConnectOptions();
cOpts.setCleanSession(true);
cOpts.setUserName("curt");
cOpts.setPassword("hello".toCharArray());
subClient.setCallback(new PahoCallback());
subClient.connect(cOpts);
subClient.subscribe("nio/test");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.add(new pubThreadBitMsg());
}
System.out.println("--started");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.get(i).start();
}
System.out.println("-- waiting");
Thread.sleep(10000);
assertTrue(numberOfThreads == m_receiveCounter.get());
}
/**
*
*/
private void resetFlag()
{
f_messageReceived = false;
f_ackReceived = false;
}
/***************************************************************/
/** CALL BACKS FOR MQTT */
/***************************************************************/
/**
*
*/
public void connectionLost(Throwable arg0)
{
System.out.println("MQTT - Connection Lost");
// f_terminate = true;
f_lost = true;
}
/**
*
*/
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
f_ackReceived = true;
}
/**
*
*/
public void messageArrived(String arg0, MqttMessage arg1) throws Exception
{
synchronized (numberOfMessages)
{
numberOfMessages++;
System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
// System.out.print(arg1.isDuplicate()?"*":".");
byte[] payloadBytes = arg1.getPayload();
if (payloadBytes.length >0 )
{
messagePayload = new String(payloadBytes);
}
System.out.println("Message Recieved...");
f_messageReceived = true;
}
}
}