Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: beam-model
    • Labels:
      None

      Description

      For some applications, it's useful to declare a pane/window to be emitted (or finished) based on its contents. The simplest of these is the AfterCount trigger, but more sophisticated predicates could be constructed.

      The requirements for consistent trigger firing are essentially that the state of the trigger form a lattice and that the "should fire?" question is a monotonic predicate on the lattice. Basically it asks "are we high enough up the lattice?"

      Because the element types may change between the application of Windowing and the actuation of the trigger, one idea is to extract the relevant data from the element at Windowing and pass it along implicitly where it can be combined and inspected in a type safe way later (similar to how timestamps and windows are implicitly passed with elements).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user katsiapis opened a pull request:

          https://github.com/apache/incubator-beam/pull/1392

          BEAM-101 A few improvements to Apache Beam Python's FileIO.

          • Ensuring that AUTO compression works properly for FileSinks.
          • Introducing _enter_ and _exit_ in _CompressedFile to allow use
            of "with", and updating textio accordingly.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/katsiapis/incubator-beam fileio

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/incubator-beam/pull/1392.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1392


          commit 5748647aa0d409828f9769dbc472bdb50f92a343
          Author: Gus Katsiapis <katsiapis@katsiapis-linux.mtv.corp.google.com>
          Date: 2016-11-19T02:31:20Z

          A few improvements to Apache Beam Python's FileIO.

          • Ensuring that AUTO compression works properly for FileSinks.
          • Introducing _enter_ and _exit_ in _CompressedFile to allow use
            of "with", and updating textio accordingly.

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user katsiapis opened a pull request: https://github.com/apache/incubator-beam/pull/1392 BEAM-101 A few improvements to Apache Beam Python's FileIO. Ensuring that AUTO compression works properly for FileSinks. Introducing _ enter _ and _ exit _ in _CompressedFile to allow use of "with", and updating textio accordingly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/katsiapis/incubator-beam fileio Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1392 commit 5748647aa0d409828f9769dbc472bdb50f92a343 Author: Gus Katsiapis <katsiapis@katsiapis-linux.mtv.corp.google.com> Date: 2016-11-19T02:31:20Z A few improvements to Apache Beam Python's FileIO. Ensuring that AUTO compression works properly for FileSinks. Introducing _ enter _ and _ exit _ in _CompressedFile to allow use of "with", and updating textio accordingly.
          Hide
          peihe0@gmail.com Pei He added a comment -

          Background:
          We need a way to define a custom trigger, such as AfterPaneTriggerWithMaxInterval (trigger after certain element count or when the time gap between two elements exceeds the maxInterval).

          Currently, there are fixed number of trigger cases, and the only way to create custom trigger is to modify existing one on runners/core:
          https://github.com/apache/beam/compare/master...peihe:custom-after-pane?expand=1

          1. Could we make the AfterPane trigger customizable based on more information, such as timestamp?
          2. Could we define the data-driven trigger based on a CombineFn<ElementT, AccumT, Boolean> (or a equivalence, such as the following TriggerFn)?
          class TriggerFn<ElementT, AccumT> implements Serializable

          { public abstract AccumT createAccumulator(); public abstract AccumT onElement(ElementT element, BoundedWindow window, Instant timestamp); public abstract boolean shouldFire(AccumT accum); }
          Show
          peihe0@gmail.com Pei He added a comment - Background: We need a way to define a custom trigger, such as AfterPaneTriggerWithMaxInterval (trigger after certain element count or when the time gap between two elements exceeds the maxInterval). Currently, there are fixed number of trigger cases, and the only way to create custom trigger is to modify existing one on runners/core: https://github.com/apache/beam/compare/master...peihe:custom-after-pane?expand=1 1. Could we make the AfterPane trigger customizable based on more information, such as timestamp? 2. Could we define the data-driven trigger based on a CombineFn<ElementT, AccumT, Boolean> (or a equivalence, such as the following TriggerFn)? class TriggerFn<ElementT, AccumT> implements Serializable { public abstract AccumT createAccumulator(); public abstract AccumT onElement(ElementT element, BoundedWindow window, Instant timestamp); public abstract boolean shouldFire(AccumT accum); }
          Hide
          kenn Kenneth Knowles added a comment - - edited

          I believe you could address the use case in a couple of ways:

          1. A DoFn that uses state and timers to implement this behavior. You can do essentially any custom triggering with this. The only issue is that your runner needs to support it.
          2. The approach of a CombineFn does not work as described - you cannot apply it right at the GBK because the element type may not match. You cannot apply it right at the Window.into because the element may lead to many output elements and there's not really a good story around propagating metadata in that case. You could have a CombineFn<Instant, AccumT, Boolean> and it could work.

          The other trouble is that including a CombineFn in a trigger is not as portable; it needs a different execution strategy that calls a UDF, possibly over the Fn API. Today, triggers are just syntax, so they can be executed easily and efficiently within a runner via any means.

          The most coherent approach I know of for custom triggers (which is not really fleshed out) is to do the combine on a PCollection explicitly and then have a trigger that just references that PCollection. The runner then just needs to be able to decode a bool, not run a CombineFn.

          In my mind, data-driven trigger means a trigger that is aware of the details of the data type. A timestamp-driven trigger would not really be data-driven in this way. But until we have some clear design for custom triggers, we could definitely consider adding new syntax to triggers for particular common uses. If the existing solutions don't work for you, please open a JIRA for your specific use case.

          Show
          kenn Kenneth Knowles added a comment - - edited I believe you could address the use case in a couple of ways: 1. A DoFn that uses state and timers to implement this behavior. You can do essentially any custom triggering with this. The only issue is that your runner needs to support it. 2. The approach of a CombineFn does not work as described - you cannot apply it right at the GBK because the element type may not match. You cannot apply it right at the Window.into because the element may lead to many output elements and there's not really a good story around propagating metadata in that case. You could have a CombineFn<Instant, AccumT, Boolean> and it could work. The other trouble is that including a CombineFn in a trigger is not as portable; it needs a different execution strategy that calls a UDF, possibly over the Fn API. Today, triggers are just syntax, so they can be executed easily and efficiently within a runner via any means. The most coherent approach I know of for custom triggers (which is not really fleshed out) is to do the combine on a PCollection explicitly and then have a trigger that just references that PCollection. The runner then just needs to be able to decode a bool, not run a CombineFn . In my mind, data-driven trigger means a trigger that is aware of the details of the data type. A timestamp-driven trigger would not really be data-driven in this way. But until we have some clear design for custom triggers, we could definitely consider adding new syntax to triggers for particular common uses. If the existing solutions don't work for you, please open a JIRA for your specific use case.
          Hide
          peihe0@gmail.com Pei He added a comment - - edited

          I opened https://issues.apache.org/jira/browse/BEAM-2402 for timestamp-driven trigger.

          Show
          peihe0@gmail.com Pei He added a comment - - edited I opened https://issues.apache.org/jira/browse/BEAM-2402 for timestamp-driven trigger.

            People

            • Assignee:
              Unassigned
              Reporter:
              robertwb Robert Bradshaw
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:

                Development