Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-1146

Excel RTD Server

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • M2
    • None
    • .NET Client
    • None
    • Windows .NET with Excel

    Description

      -QUOTED FROM AN EMAIL SENT TO QPID'S MAILING LIST-
      Hi All,

      The following email contains an Excel RTD server which is able to subscribe to information from an M2 Qpid server.
      This is just a proof of concept, there are almost no optimizations, check for leaks, etc. I'm not really a .NET programmer and I have never done any COM programming (and no C++ since college).

      In any case, try it out. It works for me. I haven't figured out how to run it outside Visual Studio 2008 yet (no idea how to create install packages, register assemblies, etc.).

      QPID C# folks are welcome to use it as they wish. I probably won't maintain this, hopefully someone else will.

      In excel, you just have to type the following formula:
      =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
      2'","<topic>","<field>")
      This will subscribe to a topic, each time an update is received, it will retrieve a field and display it in Excel.
      Example:

      =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
      2'","md.bidsoffers","price")

      This will just display the price for all incoming bidsoffers (so MSFT/ORCL/EBAY will be mixed in)

      You can also add 'filters' to the RTD function. Just append two parameters to the end of the previous RTD function, the first parameter refers to the field you wish to use in comparison, the second parameter refers to the value the first parameter must have.

      Example:

      =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
      2'","md.bidsoffers","price",
      "symbol","MSFT")

      This will subscribe to the same information as the last RTD function, but will only display prices for MSFT.

      You can add as many filters as you like, just make sure you append the RTD function with a filter field and a filter value.
      -8<----------------------------RTDTest.cs---------------------------


      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using Apache.Qpid.Messaging;
      using Apache.Qpid.Client.Qms;
      using Apache.Qpid.Client;
      using System.Runtime.InteropServices;
      using Microsoft.Office.Interop.Excel;

      //Shahbaz Chaudhary
      namespace RTDTest
      {
      [ComVisible(true), ProgId("RTD.Test")]
      public class RTDTest : IRtdServer
      {
      //QPID CACHE
      Dictionary<string, IChannel> channelCache;//url, channel
      Dictionary<string, int> channelCacheCount;
      Dictionary<string, IMessageConsumer> topicCache;//url+topic, consumer
      Dictionary<string, int> topicCacheCount;
      Dictionary<int, string> topicIDCache;//url+topic+field, topicid
      Dictionary<string, IList<Tuple2<int, string>>> topicTopicIDFieldCache;
      Dictionary<int, Tuple2<string[], string[]>> filtersCache;
      //END QPID CACHE

      //IRTDServer Globals
      IRTDUpdateEvent updateEvent;
      Queue<Tuple2<int, object>> refreshQ;
      //END IRTDServer Globals

      public RTDTest()

      { channelCache = new Dictionary<string, IChannel>(); topicCache = new Dictionary<string, IMessageConsumer>(); channelCacheCount = new Dictionary<string, int>(); topicCacheCount = new Dictionary<string, int>(); topicIDCache = new Dictionary<int, string>(); topicTopicIDFieldCache = new Dictionary<string, IList<Tuple2<int, string>>>(); filtersCache = new Dictionary<int, Tuple2<string[], string[]>>(); refreshQ = new Queue<Tuple2<int, object>>(); }

      //QPID METHODS

      private IChannel getChannel(string url)
      {
      IChannel chan;
      if (channelCache.ContainsKey(url))

      { chan = channelCache[url]; }

      else

      { IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(url); Apache.Qpid.Messaging.IConnection connection = new AMQConnection(connectionInfo); IChannel channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); connection.Start(); chan = channel; channelCache[url] = chan; }

      return chan;
      }

      private IMessageConsumer getTopicConsumer(string url, string
      topic)
      {
      IMessageConsumer cons;
      string key = url + topic;
      if (topicCache.ContainsKey(key))

      { cons = topicCache[key]; }

      else

      { IChannel channel = getChannel(url); string tempQ = channel.GenerateUniqueName(); channel.DeclareQueue(tempQ, false, true, true); cons = channel.CreateConsumerBuilder(tempQ).Create(); channel.Bind(tempQ, ExchangeNameDefaults.TOPIC, topic); topicCache[key] = cons; }

      return cons;
      }

      private IList<Tuple2<int, string>> getFields(string topic)
      {
      if (!topicTopicIDFieldCache.ContainsKey(topic))

      { topicTopicIDFieldCache[topic] = new List<Tuple2<int, string>>(); }

      return topicTopicIDFieldCache[topic];
      }

      private void onMessage(IMessage msg, string url, string topic, string field, int topicid)
      {
      foreach (Tuple2<int, string> f in getFields(topic))//?
      {
      int id = f.a;
      object value = msg.Headers[f.b];
      //Dictionary<int, object> d = new Dictionary<int,
      object>();
      //d.Add(id, value);
      string[] filterFields = filtersCache[id].a;
      string[] filterVals = filtersCache[id].b;
      if(allFiltersTrue(filterFields,filterVals,msg))

      { refreshQ.Enqueue(new Tuple2<int,object>(id,value)); }

      }
      try

      { updateEvent.UpdateNotify(); }

      catch (COMException e)
      {
      }
      }

      void registerTopicID(string url, string topic, string field, int
      topicid)
      {
      string val = url + "|" + topic + "|" + field;
      topicIDCache.Add(topicid, val);
      Tuple2<int, string> dict = new Tuple2<int,
      string>(topicid,field);
      getFields(topic).Add(dict);

      if (!channelCacheCount.ContainsKey(url))
      channelCacheCount[url] = 0;
      channelCacheCount[url]++;
      if (!topicCacheCount.ContainsKey(url + "|" + topic)) topicCacheCount[url + "|" + topic] = 0;
      topicCacheCount[url + "|" + topic]++;

      getTopicConsumer(url, topic).OnMessage += msg =>

      { onMessage(msg,url, topic, field, topicid); }

      ;
      }

      private bool allFiltersTrue(string[] filterKeys, string[] filterVals, IMessage msg)
      {
      for (int i = 0; i < filterKeys.Length; i++)
      {
      if
      (!msg.Headers[filterKeys[i]].ToString().Equals(filterVals[i]))

      { return false; }

      }
      return true;
      }

      public void removeRegisteredTopic(int topicid)
      {
      string vals = topicIDCache[topicid];
      string[] keys = vals.Split(new char[]

      { '|' }

      );
      string url = keys[0];
      string topic = keys[1];
      string field = keys[2];
      channelCacheCount[url]--;
      topicCacheCount[url + "|" + topic]--;

      if (channelCacheCount[url] <= 0)

      { channelCacheCount.Remove(url); channelCache[url].Dispose(); channelCache.Remove(url); }

      if (topicCacheCount[url + "|" + topic] <= 0)

      { topicCacheCount.Remove(url + "|" + topic); topicCache[url + "|" + topic].Dispose(); topicCache.Remove(url + "|" + topic); topicTopicIDFieldCache.Remove(topic); }

      filtersCache.Remove(topicid);
      }

      //END QPID METHODS

      //----------------------------------------------------------------------
      -----------------------------------
      //IRTDServer METHODS
      #region IRtdServer Members

      public int ServerStart(IRTDUpdateEvent CallbackObject)

      { updateEvent = CallbackObject; return 1; }

      public object ConnectData(int TopicID, ref Array Strings, ref bool GetNewValues)
      {
      int size = Strings.Length;
      int conditions = (int)Math.Floor((double)(size - 3) / 2);

      string url;
      string topic;
      string field;
      string[] filterKeys = new string[conditions];
      string[] filterVals = new string[conditions];

      url = (string)Strings.GetValue(0);
      topic = (string)Strings.GetValue(1);
      field = (string)Strings.GetValue(2);

      for (int i = 0; i < conditions; i = i + 2)

      { filterKeys[i] = (string)Strings.GetValue(i + 3); filterVals[i] = (string)Strings.GetValue(i + 1 + 3); }

      Tuple2<string[], string[]> filters = new Tuple2<string[], string[]>(filterKeys,filterVals);
      filtersCache.Add(TopicID, filters);

      registerTopicID(url, topic, field, TopicID);
      return "Getting data...";
      }

      public void DisconnectData(int TopicID)

      { removeRegisteredTopic(TopicID); }

      public int Heartbeat()

      { return 1; }

      public Array RefreshData(ref int TopicCount)
      {
      Tuple2<int, object> data;
      object[,] result = new object[2, refreshQ.Count];
      TopicCount = 0;
      for (int i = 0; i < refreshQ.Count; i++)

      { data = refreshQ.Dequeue(); TopicCount++; result[0, i] = data.a; result[1, i] = data.b; }

      return result;
      }

      public void ServerTerminate()
      {
      foreach (IChannel c in channelCache.Values)

      { c.Dispose(); }

      }

      #endregion
      //END IRTDServer METHODS
      }

      class Tuple2<T, U>
      {
      public Tuple2(T t, U u)

      { a = t; b = u; }

      public T a

      { get; set; }
      public U b { get; set; }

      }

      class Tuple3<T, U, V>
      {
      public Tuple3(T t, U u, V v)

      { a = t; b = u; c = v; }

      public T a

      { get; set; }
      public U b { get; set; }

      public V c

      { get; set; }

      }
      }

      Attachments

        1. RTDTest.cs
          10 kB
          Shahbaz Chaudhary
        2. RTDTest.cs
          9 kB
          Shahbaz Chaudhary

        Activity

          People

            cctrieloff Carl C. Trieloff
            shahbazc Shahbaz Chaudhary
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified