Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10382

Python SDK should set merge status to "ALREADY_MERGED"

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.20.0, 2.22.0
    • None
    • sdk-py-core
    • Direct Runner and Dataflow

    Description

      I tried this with Apache Beam 2.20 and 2.22

      1) What I want to achieve:

      I have a pipeline that is reading from Google Pub/Sub. The messages have user and product information. In the end, I need to analyse the data so I can know, for each user's session, how many products of each type there are.

      2) What I did:

      The first thing I do in my pipeline is a "Group by Key", using the user as a key and using "beam.WindowInto(beam.window.Sessions(15))" as windows. Then, as I need to aggregate over products for each user/session, I do another "Group by Key", this time with the product as key.

      3) What I expect to happen:

      With the first "Group by key", the pipeline creates a different window for each user/session combination. So, for the second "Group by key", I expect that it doesn't
      mix elements that come from different windows.

      4) What actually happens:

      If the messages are at least 1 second apart from each other, the pipeline works as I expect.
      However, if I publish all the messages at the same time, all the sessions and users get mixed.

      Here https://github.com/Oscar-Rod/apache-beam-testing you have a complete working example.

      To publish the messages I do the following:

       

      def generate_message(user, products):
       return json.dumps({"user": user, "products": products,}).encode("utf-8")
      
      messages = [
       generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
       generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
       generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
       generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
       generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
       generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
       generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
       generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
       generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
      ]
      for message in messages:
       publisher.publish(topic_path, data=message)
       # time.sleep(1)
      

       

      This will publish 9 messages. As the sessions are configured with a length of 15 seconds, it should create one session for each user. In the end, the user 1 should have 3 "prod_1", the user 2 should have 6 "prod_1" and the user 3 should have 9 "prod_1".

      The first step in the pipeline is reading from Pub/Sub:

       

      messages = (
       pipeline
       | "read messages" >> beam.io.ReadFromPubSub(topic=options.input_topic)
       | "parse to messages" >> beam.ParDo(ParseMessage())
       )

       

      It will parse the messages to the following Class:

       

      class Product(BaseModel):
       id: str
       quantity: str
      
      class Message(BaseModel):
       user: str
       products: List[Product]
       timestamp: datetime
      

       

      Then, I apply the sessions and the Group By Key:

       

      sessions = (
       messages
       | "window" >> beam.WindowInto(beam.window.Sessions(15))
       | "add key" >> beam.Map(lambda element: (element.user, element.products))
       | "group by user" >> beam.GroupByKey()
       )

      After this, I am getting the following elements:

       

      ('user_1', [[Product(id='prod_1', quantity='1')], [Product(id='prod_1', quantity='1')], [Product(id='prod_1', quantity='1')]])
      ('user_2', [[Product(id='prod_1', quantity='2')], [Product(id='prod_1', quantity='2')], [Product(id='prod_1', quantity='2')]])
      ('user_3', [[Product(id='prod_1', quantity='3')], [Product(id='prod_1', quantity='3')], [Product(id='prod_1', quantity='3')]])
      

       

      To aggregate for each product, I need the product as a key, so I modified the previous step to flatten the elements:

       

      def flat_function(key, elements):
       for element in elements:
       yield (key, element)
      sessions = (
       messages
       | "window" >> beam.WindowInto(beam.window.Sessions(15))
       | "add key" >> beam.Map(lambda element: (element.user, element.products))
       | "group by user" >> beam.GroupByKey()
       | "first flatten" >> beam.FlatMapTuple(flat_function)
       | "second flatten" >> beam.FlatMapTuple(flat_function)
       )
      

       

      And I am getting the following:

       

      ('user_1', Product(id='prod_1', quantity='1'))
      ('user_1', Product(id='prod_1', quantity='1'))
      ('user_1', Product(id='prod_1', quantity='1'))
      ('user_2', Product(id='prod_1', quantity='2'))
      ('user_2', Product(id='prod_1', quantity='2'))
      ('user_2', Product(id='prod_1', quantity='2'))
      ('user_3', Product(id='prod_1', quantity='3'))
      ('user_3', Product(id='prod_1', quantity='3'))
      ('user_3', Product(id='prod_1', quantity='3'))
      

       

      Now, the last step:

       

      products = (
       sessions
       | "add new key" >> beam.Map(lambda session: (session[1].id, (session[1], session[0])))
       | "group by product" >> beam.GroupByKey()
       )

      And here is where the issue happens. If the messages are published at least 1 second apart, this is what I get:

       

      ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1')])
      ('prod_1', [(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2')])
      ('prod_1', [(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3')])
      

       

      The result is what I expect, 3 elements, one per each user's session. And looking at the "quantity" we can confirm that the result is correct. All elements with "quantity=3" are in the same element, as they come from the same user/session. The same applies to the elements with "quantity=2" and "quantity=1".

      However, if I publish the messages all at the same time, this is what I get:

       

      ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3')])

      Only 1 element, with all the messages in it. So clearly, when the timestamp of the messages is too close, Apache Beam can't put them in different sessions.

      The fact the the behaviour of the pipeline changes when the timestamp of the messages changes, makes me think that this is a bug in Apache Beam. What do you think? Is it possible? Does anyone have an explanation as to why this happens? Can this somehow be expected behaviour?

      Attachments

        Activity

          People

            Unassigned Unassigned
            Oscar-Rod Oscar Rodriguez
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: