Uploaded image for project: 'Eagle (Retired)'
  1. Eagle (Retired)
  2. EAGLE-66

Eagle TypeSafe Stream Processing DSL

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • v0.3.0
    • v0.3.0
    • None
    • None

    Description

      Main Features

      1. Typesafe API: Currently the stream processing API is not type-safe (Type info are erased by stream framework), all programming interfaces for developer are faced to Object/AnyRef, which is not very friendly and extensible for framework user (application developer):

      public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
      

      So i propose the interface as (all type information are transparent for developer, not need additional parameters, supported by Scala implicit TypeTag)

      class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
      • StreamInfo: contains Stream core information including streamId, processing element id/name, entity type info (class/TypeTag)
      • StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and java-compatible API
      class StreamInfo  extends Serializable{
        val id:Int = UniqueId.incrementAndGetId()
        var name: String = null
        var streamId:String=null
        var parallelismNum: Int = 1
        var inKeyed:Boolean = false
        var outKeyed:Boolean = false
        var keySelector:KeySelector = null
      
        var typeClass:Class[_] = classOf[AnyRef]
        @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
      }
      

      And the StreamInfo can be shared through the runtime as implicit context for execution layer as well:

      abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
      

      2. KeyValue Based Structure: currently framework user (developer) have to handle with field declaration again and again, and framework and business logic are highly coupled, according to the StreamProtocol description, user should not care about framework level detail like internal data structure for Storm using List<Object> with Fields<String> which is not friendly for developer, we should make sure user focus on business logic only like:

      env.from(tuples)
      	.groupByKey(_.name)
      

      3. Spout grouping instead of overriding Schema: currently especially in use case like HdfsAuditLog Monitoring, if developer wants to groupby certain key, they are forced to override Schema (specific for storm) , which is not good and un-reusable.

      4. Environment Decoupled: currently the stream (metadata) /dag (logic) / environment (execution) are coupled with storm internal implementation, which is not good for becoming a metadata-driven pipeline framework in future, so we should decouple it.

      5. Compatible with Field-based Structure in old framework and application.

      6. Configuration: enhanced config wrapper upon typesafe-config for supporting get/set/default and integrated with ExecutionEnvironment

      val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
        val streamName = env.config.get[String]("eagle.stream.name","eventStream")
        val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
        env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
      

      Sample Application

      case class Entity(name:String,value:Double,var inc:Int=0)
      
      val tuples = Seq(
      	Entity("a", 1),
      	Entity("a", 2),
      	Entity("a", 3),
      	Entity("b", 2),
      	Entity("c", 3),
      	Entity("d", 3)
      )
      val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
      // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 
      env.from(tuples)
      	.groupByKey(_.name)
      	.map(o => {o.inc += 2;o})
      	.filter(_.name != "b")
      	.filter(_.name != "c")
      	.groupByKey(o=>(o.name,o.value))
      	.map(o => (o.name,o))
      	.map(o => (o._1,o._2.value,o._2.inc))
      	.foreach(println)
       env.execute()
      

      Type is transparent for developer during both DAG compiling (programming) and runtime (metadata) phases

      2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded DAG 
      { 
      	IterableStreamProducer[Entity]_1{1} -> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
      	GroupByKeyProducer[<function1>(Entity)]_2{1} -> MapProducer[Entity]_3{1} in shuffleGroup
      	MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
      	FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup
      	FilterProducer[Entity]_5{1} -> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
      	GroupByKeyProducer[<function1>(Entity)]_6{1} -> MapProducer[Tuple2]_7{1} in shuffleGroup
      	MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
      	MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
      }
      2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG 
      { 
      	IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in groupByKey(<function1>)
      	MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
      	FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup
      	FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in groupByKey(<function1>)
      	MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
      	MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
      }
      2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm topology DAG
      {
       	Spout[IterableStreamProducer[Entity]_1]{1} -> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
      	Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in shuffleGroup
      	Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} in shuffleGroup
      	Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in groupByKey(<function1>)
      	Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in shuffleGroup
      	Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in shuffleGroup 
      }
      

      Attachments

        Activity

          People

            haoch Hao Chen
            haoch Hao Chen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: