Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7819

PubsubMessage message parsing is lacking non-attribute fields

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: io-py-gcp
    • Labels:
      None

      Description

      User reported issue: https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E

      """
      Looking at the source code, with my untrained python eyes, I think if the intention is to include the message id and the publish time in the attributes attribute of the PubSubMessage type, then the protobuf mapping is missing something:-

      @staticmethod
      def _from_proto_str(proto_msg):
      """Construct from serialized form of ``PubsubMessage``.

      Args:
      proto_msg: String containing a serialized protobuf of type
      https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage

      Returns:
      A new PubsubMessage object.
      """
      msg = pubsub.types.pubsub_pb2.PubsubMessage()
      msg.ParseFromString(proto_msg)

      1. Convert ScalarMapContainer to dict.
        attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
        return PubsubMessage(msg.data, attributes)

      The protobuf definition is here:-

      https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage

      and so it looks as if the message_id and publish_time are not being parsed as they are seperate from the attributes. Perhaps the PubsubMessage class needs expanding to include these as attributes, or they would need adding to the dictionary for attributes. This would only need doing for the _from_proto_str as obviously they would not need to be populated when transmitting a message to PubSub.

      My python is not great, I'm assuming the latter option would need to look something like this?

      attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
      attributes.update(

      {'message_id': msg.message_id, 'publish_time': msg.publish_time}

      )
      return PubsubMessage(msg.data, attributes)
      """

        Attachments

          Activity

            People

            • Assignee:
              Matt-Darwin Matthew Darwin
              Reporter:
              altay Ahmet Altay
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 9h 40m
                9h 40m