Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
v0.3.0
-
None
-
None
Description
Main Features
1. Typesafe API: Currently the stream processing API is not type-safe (Type info are erased by stream framework), all programming interfaces for developer are faced to Object/AnyRef, which is not very friendly and extensible for framework user (application developer):
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
So i propose the interface as (all type information are transparent for developer, not need additional parameters, supported by Scala implicit TypeTag)
class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
- StreamInfo: contains Stream core information including streamId, processing element id/name, entity type info (class/TypeTag)
- StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and java-compatible API
class StreamInfo extends Serializable{ val id:Int = UniqueId.incrementAndGetId() var name: String = null var streamId:String=null var parallelismNum: Int = 1 var inKeyed:Boolean = false var outKeyed:Boolean = false var keySelector:KeySelector = null var typeClass:Class[_] = classOf[AnyRef] @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef] }
And the StreamInfo can be shared through the runtime as implicit context for execution layer as well:
abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
2. KeyValue Based Structure: currently framework user (developer) have to handle with field declaration again and again, and framework and business logic are highly coupled, according to the StreamProtocol description, user should not care about framework level detail like internal data structure for Storm using List<Object> with Fields<String> which is not friendly for developer, we should make sure user focus on business logic only like:
env.from(tuples) .groupByKey(_.name)
3. Spout grouping instead of overriding Schema: currently especially in use case like HdfsAuditLog Monitoring, if developer wants to groupby certain key, they are forced to override Schema (specific for storm) , which is not good and un-reusable.
4. Environment Decoupled: currently the stream (metadata) /dag (logic) / environment (execution) are coupled with storm internal implementation, which is not good for becoming a metadata-driven pipeline framework in future, so we should decouple it.
5. Compatible with Field-based Structure in old framework and application.
6. Configuration: enhanced config wrapper upon typesafe-config for supporting get/set/default and integrated with ExecutionEnvironment
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) val streamName = env.config.get[String]("eagle.stream.name","eventStream") val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor") env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
Sample Application
case class Entity(name:String,value:Double,var inc:Int=0) val tuples = Seq( Entity("a", 1), Entity("a", 2), Entity("a", 3), Entity("b", 2), Entity("c", 3), Entity("d", 3) ) val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 env.from(tuples) .groupByKey(_.name) .map(o => {o.inc += 2;o}) .filter(_.name != "b") .filter(_.name != "c") .groupByKey(o=>(o.name,o.value)) .map(o => (o.name,o)) .map(o => (o._1,o._2.value,o._2.inc)) .foreach(println) env.execute()
Type is transparent for developer during both DAG compiling (programming) and runtime (metadata) phases
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded DAG { IterableStreamProducer[Entity]_1{1} -> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup GroupByKeyProducer[<function1>(Entity)]_2{1} -> MapProducer[Entity]_3{1} in shuffleGroup MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup FilterProducer[Entity]_5{1} -> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup GroupByKeyProducer[<function1>(Entity)]_6{1} -> MapProducer[Tuple2]_7{1} in shuffleGroup MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup } 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG { IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in groupByKey(<function1>) MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in groupByKey(<function1>) MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup } 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm topology DAG { Spout[IterableStreamProducer[Entity]_1]{1} -> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>) Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in shuffleGroup Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} in shuffleGroup Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in groupByKey(<function1>) Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in shuffleGroup Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in shuffleGroup }