Apache S4
  1. Apache S4
  2. S4-57

Document and improve the windowing PE

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.5.0
    • Fix Version/s: 0.5.0
    • Labels:
      None

      Description

      org.apache.s4.core.WindowingPE provides a generic facility for handling sliding stream windows.

      We should add some examples, regression tests, and improve usability. In particular it is crucial to ease the management of concurrent accesses to the window's data.

      In addition, we may also want to allow sizing the window on the number of events, not only by time intervals.

        Activity

        Matthieu Morel made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Assignee Matthieu Morel [ mmorel ]
        Fix Version/s 0.5 [ 12318653 ]
        Resolution Fixed [ 1 ]
        Hide
        Matthieu Morel added a comment -

        merged with piper branch commit 704404f6a0850711ae1a0f1b687ae48e91238e69

        New improvements will be handled through new tickets

        Show
        Matthieu Morel added a comment - merged with piper branch commit 704404f6a0850711ae1a0f1b687ae48e91238e69 New improvements will be handled through new tickets
        Hide
        Leo Neumeyer added a comment -

        +1

        This is a great start, I suggest replacing the old implementation and improve over the next few releases as we start using the API.

        Show
        Leo Neumeyer added a comment - +1 This is a great start, I suggest replacing the old implementation and improve over the next few releases as we start using the API.
        Hide
        Matthieu Morel added a comment -

        I just uploaded an API update based on Leo's comments. Available in branch S4-57 (changes rebased on top of piper branch, and forcefully pushed to the S4-57 remote branch)

        • Slot has a method for closing it
        • Slot instances are provided by a SlotFactory
        • Slot management is automatic: when a boundary is reached, the platform closes the current slot and adds a new one, created through the factory
        • count-based slots are created through a trigger (we could use the same mechanism for time based)
        • AbstractSlidingWindowPE has a method for computing an aggregation over the whole window. It could potentially be used for automating the output of the PE.
        • update method is called on the PE (which delegates to the current slot)
        Show
        Matthieu Morel added a comment - I just uploaded an API update based on Leo's comments. Available in branch S4-57 (changes rebased on top of piper branch, and forcefully pushed to the S4-57 remote branch) Slot has a method for closing it Slot instances are provided by a SlotFactory Slot management is automatic: when a boundary is reached, the platform closes the current slot and adds a new one, created through the factory count-based slots are created through a trigger (we could use the same mechanism for time based) AbstractSlidingWindowPE has a method for computing an aggregation over the whole window. It could potentially be used for automating the output of the PE. update method is called on the PE (which delegates to the current slot)
        Tony Stevenson made changes -
        Field Original Value New Value
        Workflow jira [ 12672898 ] no-reopen-closed, patch-avail [ 12711324 ]
        Hide
        Leo Neumeyer added a comment -

        Let's refresh what we want to accomplish:

        • We want to save a finite number of historical values in slots.
        • A slot can be defined using periodic time boundaries t(Start) to t(Start + Period).
        • A slot can be defined based on number of events received.
        • Every time we reach a slot boundary, we need to aggregate the values and put them in the slot.
        • A function will be applied to the list of slots to extract a value that is a function of historical values.
        • The function can be applied synchronously when a new slot is created or asynchronously.

        Thoughts:

        • We need an API to implement this in a intuitive manner.
        • The app developer needs to customize the implementation:
          . the types of the input values derived from the input event
          . a method to incrementally aggregate the input values and store them in the slot
          . functions that are applied to the slots

        With this in mind, I propose the following:

        • The WindowingPE class should handle the slot boundaries and abstract them from the app developer. The same code should be able to run with time- or event count-based boundaries.
        • So I think we can change the API and hide addSlot() from the app developer. The slot will be added when a boundary is reached (time or event count).
        • For each input Event call an update() method in the open Slot object. Perhaps we can implement this as the default (or only) behavior so the app developer doesn't have to do anything except implement the Slot class.
        • The open slot object is a slot whose boundary is still open. (There is only one).
        • To get the open slot, use method getOpenSlot()
        • The idea is to aggregate the values we store in a slot (this is the reason we use slots, otherwise we could just save all the events in a large circular buffer.) In many modeling applications, the number of events per slots may be big. The slotting architecture makes it easier to trade off computational complexity with accuracy. For example: we aggregate 10 million numbers per hour, use 1-hour slots, and save 24 slots to cover a full day. We want to save one number for every 10 million numbers. For more precision reduce the slot size to 10 minutes.
        • A typical use case is the Open, High, Low, Close (OHLC) statistics used in financial markets. Each slot could be encoded as an OHLC. Once the slot boundary is reached only 4 numbers are saved.
        • App developer has to provide a Slot class. function applied to the slots, and configure the PE.
        • This looks like another trigger to me. Can we extend the concept of a trigger (by time or by event count) to also manage the slots? In this way, we simply configure the trigger using a Slot class and the PE has access to the slots using the getSlots() method.
        • So I think I agree with adding the Slot interface with methods update() and custom methods to get the values. Not sure we want the generic getAggregatedData() method because we may have various statistics each with a type that we may want to retrieve from a slot. In the example, app dev may implement slot.getHigh(), slot.getClose(), etc.
        • Note that, logically, closed slots are immutable, the PE can use them by reference, no need to copy.
        • Would be nice to use a more meaningful example for the regression test, like computing the OHLC for each slot and apply an OHLC function to all the slots at every slot boundary. The result will be a smoothed OHLC data stream that can be run at periodic intervals or at count intervals.

        Does this help clarify the design intent? I'd rather rethink the implementation rather than modify my original one which is not that great. With your Slot class idea we can make the whole thing much easier to use.

        Show
        Leo Neumeyer added a comment - Let's refresh what we want to accomplish: We want to save a finite number of historical values in slots. A slot can be defined using periodic time boundaries t(Start) to t(Start + Period). A slot can be defined based on number of events received. Every time we reach a slot boundary, we need to aggregate the values and put them in the slot. A function will be applied to the list of slots to extract a value that is a function of historical values. The function can be applied synchronously when a new slot is created or asynchronously. Thoughts: We need an API to implement this in a intuitive manner. The app developer needs to customize the implementation: . the types of the input values derived from the input event . a method to incrementally aggregate the input values and store them in the slot . functions that are applied to the slots With this in mind, I propose the following: The WindowingPE class should handle the slot boundaries and abstract them from the app developer. The same code should be able to run with time- or event count-based boundaries. So I think we can change the API and hide addSlot() from the app developer. The slot will be added when a boundary is reached (time or event count). For each input Event call an update() method in the open Slot object. Perhaps we can implement this as the default (or only) behavior so the app developer doesn't have to do anything except implement the Slot class. The open slot object is a slot whose boundary is still open. (There is only one). To get the open slot, use method getOpenSlot() The idea is to aggregate the values we store in a slot (this is the reason we use slots, otherwise we could just save all the events in a large circular buffer.) In many modeling applications, the number of events per slots may be big. The slotting architecture makes it easier to trade off computational complexity with accuracy. For example: we aggregate 10 million numbers per hour, use 1-hour slots, and save 24 slots to cover a full day. We want to save one number for every 10 million numbers. For more precision reduce the slot size to 10 minutes. A typical use case is the Open, High, Low, Close (OHLC) statistics used in financial markets. Each slot could be encoded as an OHLC. Once the slot boundary is reached only 4 numbers are saved. App developer has to provide a Slot class. function applied to the slots, and configure the PE. This looks like another trigger to me. Can we extend the concept of a trigger (by time or by event count) to also manage the slots? In this way, we simply configure the trigger using a Slot class and the PE has access to the slots using the getSlots() method. So I think I agree with adding the Slot interface with methods update() and custom methods to get the values. Not sure we want the generic getAggregatedData() method because we may have various statistics each with a type that we may want to retrieve from a slot. In the example, app dev may implement slot.getHigh(), slot.getClose(), etc. Note that, logically, closed slots are immutable, the PE can use them by reference, no need to copy. Would be nice to use a more meaningful example for the regression test, like computing the OHLC for each slot and apply an OHLC function to all the slots at every slot boundary. The result will be a smoothed OHLC data stream that can be run at periodic intervals or at count intervals. Does this help clarify the design intent? I'd rather rethink the implementation rather than modify my original one which is not that great. With your Slot class idea we can make the whole thing much easier to use.
        Hide
        Matthieu Morel added a comment -

        I have uploaded to branch S4-57 a few modifications to the windowing PE, in complement to the ones from S4-22.

        1. I added a basic regression test
        2. I tried to provide a generic slot definition and implementation (Slot interface, DefaultAggregatingSlot class)

        A good news is that S4-63 should solve the concurrency issues, since access to the pe instance is now correctly synchronized.

        Show
        Matthieu Morel added a comment - I have uploaded to branch S4-57 a few modifications to the windowing PE, in complement to the ones from S4-22 . I added a basic regression test I tried to provide a generic slot definition and implementation (Slot interface, DefaultAggregatingSlot class) A good news is that S4-63 should solve the concurrency issues, since access to the pe instance is now correctly synchronized.
        Matthieu Morel created issue -

          People

          • Assignee:
            Matthieu Morel
            Reporter:
            Matthieu Morel
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development