Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5387

MQTT Codec - buffer mis-alignment on NIO when Back-2-Back packets are received

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.11.0
    • 5.11.0
    • MQTT
    • None
    • Paho as client

    • Patch Available

    Description

      **
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements.  See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License.  You may obtain a copy of the License at
       *
       *      http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package org.apache.activemq.transport.mqtt;
      
      import static org.junit.Assert.*;
      
      import java.io.BufferedReader;
      import java.io.IOException;
      import java.io.InputStream;
      import java.io.InputStreamReader;
      import java.io.OutputStream;
      import java.util.ArrayList;
      import java.util.concurrent.atomic.AtomicInteger;
      
      import org.apache.commons.net.util.Base64;
      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
      import org.eclipse.paho.client.mqttv3.MqttSecurityException;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.junit.Test;
      
      
      
      /**
       * Test the NIO transport with this Test group
       */
      public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {
      
          AtomicInteger m_receiveCounter = new AtomicInteger();
          String BigMessage
          static ArrayList<MqttClient> mqttClients = null;
          static final Integer staticSyncObj = new Integer(1); 
          String messagePayload = null;
          
          public final int numberOfThreads = 500;
          static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
          
          boolean f_messageReceived = false;
          boolean f_ackReceived = false;
          boolean f_lost = false;
      
          /**
           * 
           * 
           * @param client
           * @param location
           * @param accountId
           * @param userId
           * @param clientId
           * @param token
           * @param nameSpace
           * @param message
           * @param qos
           * @param f_retained
           * @param f_keepOpen
           * @return
           * @throws MqttException
           */
          private MqttClient pubNameSpace(MqttClient client,  
                                          String    location, 
                                          String    accountId, 
                                          String    userId, 
                                          String    clientId,
                                          String    token, 
                                          String    nameSpace, 
                                          String    message, 
                                          int       qos, 
                                          boolean   f_retained,  
                                          boolean   f_keepOpen) throws MqttException
          {
      
              try
              {        
                  boolean f_wasConnected = true; 
                  
                  if (client == null)
                  {
                      f_wasConnected = false; 
                      client = new MqttClient(location, clientId/*, persistence*/);
                  }
                  
                  if (!f_wasConnected)
                  {
                      MqttConnectOptions options = new MqttConnectOptions();
                      options.setKeepAliveInterval(60);
                      options.setConnectionTimeout(120);
                      options.setPassword(token.toCharArray());
                      options.setUserName(accountId+":"+userId);
                      client.connect(options);
                      client.setCallback(this);
                  }
          
                  MqttMessage mqttMessage = new MqttMessage();
                  mqttMessage.setPayload(message.getBytes());
                  mqttMessage.setQos(qos);
                  mqttMessage.setRetained(f_retained);
          
                  
                  client.publish(nameSpace, mqttMessage);
                  
                  if (!f_keepOpen)
                  {
                      client.disconnect();
                      client.close();
                      client = null; 
                  }
                  return client; 
              }
              catch (MqttPersistenceException e)
              {
                  System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
                  if (client != null)
                  {
                      client.disconnect(); 
                      client.close();
                      client = null;
                  }
              }
              catch (Exception e)
              {
                  e.printStackTrace();
                  System.err.println("Exception e = "+e.getMessage());
                  if (client != null)
                  {
                      client.disconnect(); 
                      client.close();
                      client = null;
                  }
              }
              return null;
          }
          /**
           * 
           * @param location
           * @param accountId
           * @param userId
           * @param clientId
           * @param token
           * @param nameSpace
           * @param message
           * @param qos
           * @param f_retained
           * @param f_keepOpen
           * @return
           * @throws MqttException
           */
          private MqttClient pubNameSpace(String    location, 
                                          String    accountId, 
                                          String    userId, 
                                          String    clientId,
                                          String    token, 
                                          String    nameSpace, 
                                          String    message, 
                                          int       qos, 
                                          boolean   f_retained,  
                                          boolean   f_keepOpen) throws MqttException
          {
      
              try
              {        
                  MqttClient client = new MqttClient(location, clientId/*, persistence*/);
                  client.setCallback(this);
                  MqttConnectOptions options = new MqttConnectOptions();
                  options.setKeepAliveInterval(60);
                  options.setConnectionTimeout(120);
                  options.setPassword(token.toCharArray());
                  options.setUserName(accountId+":"+userId);
          
                  MqttMessage mqttMessage = new MqttMessage();
                  mqttMessage.setPayload(message.getBytes());
                  mqttMessage.setQos(qos);
                  mqttMessage.setRetained(f_retained);
          
                  client.connect(options);
                  client.publish(nameSpace, mqttMessage);
                  
                  if (!f_keepOpen)
                  {
                      client.disconnect();
                      client.close();
                  }
                  return client; 
              }
              catch (MqttPersistenceException e)
              {
                  System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
                  assertTrue(false);
              }
              catch (Exception e)
              {
                  e.printStackTrace();
                  System.err.println("Exceptoin e = "+e.getMessage()); 
                  assertTrue(false);
              }
              return null;
          }
         
          
          /**
           * 
           * @param input
           * @param output
           * @return
           * @throws IOException
           */
          public static long copyLarge(InputStream input, OutputStream output) throws IOException 
          {
            byte[] buffer = new byte[4096];
            long count = 0L;
            int n = 0;
            while (-1 != (n = input.read(buffer))) {
             output.write(buffer, 0, n);
             count += n;
            }
            return count;
          }
          
          /**
           * 
           * @param p
           * @throws IOException
           */
          public static void outputResults (Process p) throws IOException
          {
              BufferedReader stdInput = new BufferedReader(new
                                               InputStreamReader(p.getInputStream()));
      
              BufferedReader stdError = new BufferedReader(new
                              InputStreamReader(p.getErrorStream()));
      
              String s; 
              while ((s = stdInput.readLine()) != null) 
              {
                  System.out.println(s);
              }
              while ((s = stdError.readLine()) != null) 
              {
                  System.out.println(s);
              }
          }
          
          /**
           * HELPER
           * @param msec
           * @throws InterruptedException
           */
          void pause_til_done_or_time(int msec) throws InterruptedException
          {
              int pauseTime = 100; 
              while (!f_messageReceived && msec > 0 && !f_lost)
              {
                  Thread.sleep(pauseTime);
                  msec -= pauseTime;
              }
              
          }
      
          static Integer numberOfMessages = 0; 
          public void clearMessageCount()
          {
              numberOfMessages = 0; 
          }
          public Integer getMessageCount()
          {
              return numberOfMessages; 
          }
          /**
           * 
           * @param msec
           */
          private void waitForItAck(int msec)
          {
              while (!f_ackReceived)
              {
                  
                  try { Thread.sleep(1000); } catch (Exception e){}
                  
                  msec= msec-1000;
                                  
                  if (msec < 0)
                  {
                      break;
                  }
              }
          }
          
          @Override
          public String getProtocolScheme() {
              return "mqtt+nio";
          }
      
          @Override
          public boolean isUseSSL() {
              return false;
          }
      
          public class PahoCallback implements MqttCallback {
      
              @Override
              public void connectionLost(Throwable cause) {
                  // TODO Auto-generated method stub
                  
              }
      
              @Override
              public void messageArrived(String topic, MqttMessage message)
                      throws Exception {
                  m_receiveCounter.incrementAndGet();
              }
      
              @Override
              public void deliveryComplete(IMqttDeliveryToken token) {
                  // TODO Auto-generated method stub
                  
              }
              
          }
       
          static   MqttClient BalstTestClient = null;
          String   loc = "tcp://localhost:1883";
      
          public class pubThreadBitMsg extends Thread {
              
              public pubThreadBitMsg()
              {
                  synchronized (staticSyncObj)
                  {
                      try
                      {
                          System.out.println("---- pubTheadBitMsg - constructor"); 
                          if (BalstTestClient == null)
                          {
                              BalstTestClient = pubNameSpace(loc, "cjutzi", 
                                              "someone", 
                                               "myclientid_cjutzi_pub", 
                                               "hello",
                                               "/accounts/cjutzi/users/curt/test", 
                                               "Starting Client", 1, false, true);
                              System.out.println("---- pubTheadBitMsg - init"); 
                          }
                      } 
                      catch (MqttException e)
                      {
                          // TODO Auto-generated catch block
                          e.printStackTrace();
                      }
                  }
              }
      
              public void run()
              {
      //            synchronized (staticSyncObj) 
                  {
                  try
                  {
                      System.out.println("---- pubTheadBitMsg - send"); 
                      pubNameSpace(BalstTestClient, loc, "cjutzi", 
                                    "someone", 
                                     "myclientid_cjutzi_pub", 
                                     "hello",
                                     "/accounts/cjutzi/users/curt/test", 
                                     BigMessage, 1, false, true);
                      } catch (MqttException e)
                      {
                          // TODO Auto-generated catch block
                          e.printStackTrace();
                      }
                  }
              }
          }
          
          @Test
          public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
          {
              MqttClient subClient = new MqttClient("tcp://localhost:1883",
                      "niosubclient",
                      new MemoryPersistence());
              MqttConnectOptions cOpts = new MqttConnectOptions();
              cOpts.setCleanSession(true);
              cOpts.setUserName("curt");
              cOpts.setPassword("hello".toCharArray());
              
             
              subClient.setCallback(new PahoCallback());
              subClient.connect(cOpts);
              subClient.subscribe("nio/test");
             
              
              for (int i = 0; i < numberOfThreads; i++) 
              {
                  arrThreads.add(new pubThreadBitMsg());
              }
              System.out.println("--started"); 
              for (int i = 0; i < numberOfThreads; i++) 
              {
                  arrThreads.get(i).start(); 
              }
              System.out.println("-- waiting"); 
              Thread.sleep(10000);
              assertTrue(numberOfThreads == m_receiveCounter.get());
          }
          
          
          /**
           * 
           */
          private void resetFlag()
          {
              f_messageReceived = false; 
              f_ackReceived = false; 
          }
          /***************************************************************/
          /**              CALL BACKS FOR MQTT                           */
          /***************************************************************/
         
          
      
          
          /**
           * 
           */
            public void connectionLost(Throwable arg0)
            {
                System.out.println("MQTT - Connection Lost");
      //          f_terminate = true;
                f_lost = true;
            }
      
            /**
             * 
             */
            public void deliveryComplete(IMqttDeliveryToken arg0)
            {
                System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
                f_ackReceived = true;
            }
            
            /**
             * 
             */
            public void messageArrived(String arg0, MqttMessage arg1) throws Exception
            {
                synchronized (numberOfMessages)
                {
                    numberOfMessages++;   
                    System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
      //            System.out.print(arg1.isDuplicate()?"*":".");
                    byte[] payloadBytes = arg1.getPayload(); 
                    if (payloadBytes.length >0 )
                    {
                        messagePayload = new String(payloadBytes); 
                    }
                    System.out.println("Message Recieved..."); 
                    f_messageReceived = true;
                } 
            }
            
      }
      
      
      

      Attachments

        1. MQTTCode.java.patch
          0.8 kB
          Curt Jutzi

        Activity

          People

            tabish Timothy A. Bish
            cjutzi Curt Jutzi
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: