Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Critical Critical
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Issue Links

        Activity

        Hide
        Siyuan Hua added a comment -

        Construct a dag with streams instead of operators /ports

        Show
        Siyuan Hua added a comment - Construct a dag with streams instead of operators /ports
        Hide
        Thomas Weise added a comment -

        Please add more detail.

        Show
        Thomas Weise added a comment - Please add more detail.
        Show
        Thomas Weise added a comment - https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4 Siyuan Hua This JIRA could need more information.
        Hide
        Siyuan Hua added a comment -

        First iteration of Java Stream API.

        Java Stream API is following the popular functional programming paradigm to construct an Apex Application.
        The goal for this API is:

        • Easy to construct a dag
        • Easy to migrate other streaming application to Apex
        • Fully compatible with existing DAG API
        • Provide useful build-in transformations with abstracted pluggable components in one place

        To achieve the goal and split the work, we categorize all different kind of transformations into 2 different types:

        • 1 input, 1+ output (map, filter, flatmap);
        • Multiple input, 1 output (Aggregations, Joins, Unions)

        This first iteration is only about the first category, which is, 1 in, 1+ out. For transformations like this, it is just like distributed function call. So we abstract out some function types instead of operators. Internally, there are some pre-build function operators which wrap the function and connect together.

        The core interface is the ApexStream. The ApexStream is designed in a method chain fashion, which all transformation method returns a new ApexStream object with new output type.

        Here are some examples, if you want to do a filter then a map, you can do

          stream.filter(new FilterFunction())
            .map(new MapFunction()) 
        

        You can also mix this with existing operator API. For example, if you want to add a operator after map, you can do this

          stream.filter(..)
            .map(..)
            .addOperator(opt, opt.input, opt.output)
        // the opt.input here is to connect to the output of last stream and opt.output is going to be connected to the next)
        

        If you want to set the locality or attributes for operator/ports/dag, you can use *with* clause, for example you want filter and map to be container local and you want to set checkpoint window count for the new operator you just added, you can do something like this

          stream.filter(..)
            .map(..).with(Locality.CONTAINER_LOCAL)
            .addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5)
            .with(someProp, someVal)
        //(ps:engine will figure out which operator/ports/dag this attribute applies to)
        

        Like the dag API, you can run the stream in a distributed mode or local mode, For example,

        stream...populateDag(dag) //distributed mode
        stream...runEmbedded(...) //local mode
        

        The stream is implemented in a lazy build mode, which means until you call populateDag or runEmbedded, all the transformations and the order of them will be kept in memory in a graph data structure (DagMeta). This will allow us to solve some technical difficulties such as logical plan optimization etc.

        Also the stream is flexible to extend to fit you needs in your organization. For example if you want to provide a filter and map transformation in one operator. Instead of repeating the work of connect filter and map operator together in a thread_local mode. You can add first-order function to ApexStream interface, by simply extending the default implementation ApexStreamImpl

        MyStream.java
        public class MyStream<T> extends ApexStreamImpl<T>
        {
          public MyStream(ApexStream<T> apexStream)
          {
            super(apexStream);
          }
        
          <O> MyStream<O> myFilterAndMap(Function.MapFunction<T, O> map, Function.FilterFunction<T> filterFunction)
          {
            return filter(filterFunction).map(map).with(DAG.Locality.THREAD_LOCAL);
          }
        
        }
        
        

        Then you can use your new Stream like this

        new MyStream(stream).<..,MyStream>.flatMap(...)   // existing build-in transformation
          .*myFilterAndMap(...)*   // your transformation for your org
          .print()
        
        Show
        Siyuan Hua added a comment - First iteration of Java Stream API. Java Stream API is following the popular functional programming paradigm to construct an Apex Application. The goal for this API is: Easy to construct a dag Easy to migrate other streaming application to Apex Fully compatible with existing DAG API Provide useful build-in transformations with abstracted pluggable components in one place To achieve the goal and split the work, we categorize all different kind of transformations into 2 different types: 1 input, 1+ output (map, filter, flatmap); Multiple input, 1 output (Aggregations, Joins, Unions) This first iteration is only about the first category, which is, 1 in, 1+ out. For transformations like this, it is just like distributed function call. So we abstract out some function types instead of operators. Internally, there are some pre-build function operators which wrap the function and connect together. The core interface is the ApexStream. The ApexStream is designed in a method chain fashion, which all transformation method returns a new ApexStream object with new output type. Here are some examples, if you want to do a filter then a map, you can do stream.filter( new FilterFunction()) .map( new MapFunction()) You can also mix this with existing operator API. For example, if you want to add a operator after map, you can do this stream.filter(..) .map(..) .addOperator(opt, opt.input, opt.output) // the opt.input here is to connect to the output of last stream and opt.output is going to be connected to the next) If you want to set the locality or attributes for operator/ports/dag, you can use * with * clause, for example you want filter and map to be container local and you want to set checkpoint window count for the new operator you just added, you can do something like this stream.filter(..) .map(..).with(Locality.CONTAINER_LOCAL) .addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5) .with(someProp, someVal) //(ps:engine will figure out which operator /ports/dag this attribute applies to) Like the dag API, you can run the stream in a distributed mode or local mode, For example, stream...populateDag(dag) //distributed mode stream...runEmbedded(...) //local mode The stream is implemented in a lazy build mode, which means until you call populateDag or runEmbedded, all the transformations and the order of them will be kept in memory in a graph data structure ( DagMeta ). This will allow us to solve some technical difficulties such as logical plan optimization etc. Also the stream is flexible to extend to fit you needs in your organization. For example if you want to provide a filter and map transformation in one operator. Instead of repeating the work of connect filter and map operator together in a thread_local mode. You can add first-order function to ApexStream interface, by simply extending the default implementation ApexStreamImpl MyStream.java public class MyStream<T> extends ApexStreamImpl<T> { public MyStream(ApexStream<T> apexStream) { super (apexStream); } <O> MyStream<O> myFilterAndMap(Function.MapFunction<T, O> map, Function.FilterFunction<T> filterFunction) { return filter(filterFunction).map(map).with(DAG.Locality.THREAD_LOCAL); } } Then you can use your new Stream like this new MyStream(stream).<..,MyStream>.flatMap(...) // existing build-in transformation .*myFilterAndMap(...)* // your transformation for your org .print()
        Hide
        Siyuan Hua added a comment -

        Roadmap for next phase

        In next phase, we will focus on windowed transformations with following features

        • Watermark - Ingestion time watermark / watermark from tuple
        • Early Triggers - How frequent to emit real-time partial result
        • Late Triggers - When to emit updated result with tuple comes after watermark
        • Customizable trigger action - What to do when trigger is fired
        • Spool window state - Large In-Memory state could be spooled to disk
        • 3 different accumulation models: ignore, accumulation, accumulation + delta
        • Window support: Non-Mergeable window(fix window, sliding window), Mergeable window(session window)
        • 3 tuple time support: event time, system time, ingestion time
        Show
        Siyuan Hua added a comment - Roadmap for next phase In next phase, we will focus on windowed transformations with following features Watermark - Ingestion time watermark / watermark from tuple Early Triggers - How frequent to emit real-time partial result Late Triggers - When to emit updated result with tuple comes after watermark Customizable trigger action - What to do when trigger is fired Spool window state - Large In-Memory state could be spooled to disk 3 different accumulation models: ignore, accumulation, accumulation + delta Window support: Non-Mergeable window(fix window, sliding window), Mergeable window(session window) 3 tuple time support: event time, system time, ingestion time
        Hide
        Siyuan Hua added a comment -

        Some writeups about high-level API

        Show
        Siyuan Hua added a comment - Some writeups about high-level API

          People

          • Assignee:
            Unassigned
            Reporter:
            Siyuan Hua
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development