Description
GearPump Application and Library Manifest
Requirements
This design is to solve a number of issues related to application jar submission and DAG UI creation and manipulation. The goals are to make it easier or possible to:
- Submit applications and have them run with zero configuration. Everything required for them to run including environment specific configuration properties is in the jar. Submission may be browser or command based.
- Decrease the likelihood of DAG failure after submission due to misconfiguration, etc.
- Manipulate or create a DAG easily in a browser without requiring manual input of Tasks.
- Query a task repository for specific tasks or for a listing of task types that can be imported / exposed within a DAG Editor.
- Enable an extensibility mechanism where libraries of tasks may be defined and categorized and made available to tools like DAG builders
- Provide a way to reduce jar sizes and possibly share jar dependencies across tasks.
- Allow categorization of tasks as appropriate for types of flow: source, sink, bidirectional, multicast, etc. This information may be used by DAG editors to embellish task icons or filter task selection, etc.
- Allow a task to be optionally typed with related message types for input and output. This may allows programmatic generation of flows or even programmatic generation of message types.
- Allow task grouping which would enable sub-graph imports and labeling. A DAG editor should allow the user to define and package a subgraph. Considering the dynamic DAG usage, user wants to attach a subgraph to the running DAG. But, how is this subgraph being developed, defined and packaged? E.g. a data scientist can develop a new model (subgraph) to read from kafka source and performing scoring. Apparently, when packaging, we need to avoid packaging the kafka source processor into the package. Otherwise, this model will not run as it will use training kafka instead of product kakfa.
- Allow a task to be distributed and run with a minimal set of dependencies.
Use Cases
Currently GearPump applications can be submitted as a jar via the browser. To the browser this jar is opaque. Validation of the application must be done server side, and error details may be needlessly complex, misleading or uninformative to the end user. Additionally, surfacing DAG information prior to submitting the application for possible editing is hopelessly convoluted and would involve:
- submitting the jar
- querying the server to retrieve application details
- killing the application
Given the jar may be quite large - having the user wait possibly minutes for upload just to expose the DAG is impractical and limits functionality required to move forward with critical features like DAG creation and editing. Rather we require a mechanism that surfaces application information like its DAG within seconds after selecting the jar on the user's machine. We also want an ability to minimize the size of GearPump applications even when these applications may have a massive set of dependencies. Upcoming use cases may require dependencies that are impractical to resolve using jar inclusion or fat jars. Additionally, the current application jar does not provide information that could allow individual task distribution with specific jar allocations. In other words a 100M application jar defining 5 Tasks would require this jar to be distributed to each GearPump worker even if one of the tasks did nothing but summed 2 fields and sent the result downstream.
Design
Manifest details
Manifest structure
Central to this design is a manifest specification that describes a GearPump jar. This specification is a JSON file that can be bundled with all GearPump jars. The creation of this manifest will either be generated programmatically or could be built manually. Programmatic generation will be a simple tool or sbt plugin that builds this file as part of application packaging. Similar to node.js's package.json file, this file will provide meta-data describing a GearPump application or library including:
> - manifest version
> - manifest type
> - application main
> - application name
> - application version
> - application configuration
> - processors
> - graph
> - dependencies
> - repos
> - user name
> - keywords
Manifest name
Within the jar file, a top level entry of the name gearpump.json will indicate that this jar is a GearPump related jar. This file will hold contents noted above and will be validated per its JSON schema included in this document.
Entries and definitions
The JSON schema describing the above entries will provide relevant typing that may be structural or categorical. For example the manifest type entry above would be (APPLICATION|LIBRARY). It's anticipated that additional information not specified here will be required. For example a user's organization or perhaps a user's role capability. There could be other security attributes as well. Updates to the JSON schema that include new information will result in a bump in the manifest version to reflect the schema change. The manifest version will adhere to the semantic versioning specification. It's also anticipated that this manifest will enable DAG creation and tooling related to types of applications that a DAG creation tool may want to expose for example possible types of tasks within general categories (source, sink, bidirectional, multitask etc).
Dynamic creation of applications
One impact of providing a manifest is the application main entry noted above in the manifest structure is now optional. Once a manifest is generated as part of the build we could safely remove all application mains - like those that are included under the gearpump examples directory. We *will* require the jar submission REST endpoint to do the equivalent of:
- parse the json file
- create a userConfig instance based on the application configuration values
- create Processor[Task] instances using the tasks information
- connect Processor[Task] instances based on the graph information and create a Graph instance
- create a StreamingApplication instance using the application name, application config and Graph instance
- submit the StreamingApplication instance to a ClientContext.
In other words we require the REST endpoint receiving the jar file to *programmatically* do something similar to what the WordCount application does below based on the application manifest.
object WordCount extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) val RUN_FOR_EVER = -1 override val options: Array[(String, CLIOption[Any])] = Array( "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) ) def application(config: ParseResult) : StreamApplication = { val splitNum = config.getInt("split") val sumNum = config.getInt("sum") val split = Processor[Split](splitNum) val sum = Processor[Sum](sumNum) val partitioner = new HashPartitioner val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty) app } override def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) val context = ClientContext(akkaConf) val appId = context.submit(application(config)) context.close() } }
Note that something like this already exists in GearPump associated with the REST endpoint */api/v1.0/submitapp*. This endpoint accepts a POST with a JSON structure that is reified into a scala case class shown below
case class SubmitApplicationRequest ( appName: String, processors: Map[ProcessorId, ProcessorDescription], dag: Graph[Int, String], userconfig: UserConfig = UserConfig.empty)
This is used to submit the application to the master. The manifest is merely a way of storing the SubmitApplicationRequest with the jar as a JSON file.
UI DAG Editor functionality
Use case: Creating a new DAG
DAG editors that are browser based will likely address a number of use cases. One key use case is the ability to select a local application jar and create or edit a DAG based on its contents.
Parsing jar files
An application jar may be parsed immediately within the browser prior to upload by using JsZip. JsZip can be used within any grade A browser or from the command line, allowing tooling to be created in either area. JsZip is also quite fast, parsing a 61M jar in a chrome browser on Mac OS X took 2seconds for JsZip to return all entries. JsZip has a nice set of features:
- JsZip can read local jars that are selected using the native file dialog
- JsZip can read remote jar's, allowing importing of Tasks from external repo's other the origin server.
- JsZip can create jar files, opening up the possibility of saving the results of building an application locally to the users machine. An example of parsing a jar file that had been dropped within a HTML drop zone can be found here.
Security risks
Parsing a jar that has been selected by a user using a browser's native dialog is safe for several reasons:
- The jar is already resident on the users computer.
- Parsing a jar to read the jar entries using JsZip typically uses modern browser's ArrayBuffer or Uint8Array. Both are intended to deal with binary data and there is no increased risk of buffer overflows.
- Parsing a jar file from a remote url rather than locally should only come from sanctioned repo's.
Manifest Definition
JSON schema (incomplete)
see generator
{ "$schema": "http://json-schema.org/draft-04/schema#", "id": "http://jsonschema.net", "type": "object", "properties": { "manifestVersion": { "id": "http://jsonschema.net/manifestVersion", "type": "string" }, "manifestType": { "enum": ["APPLICATION", "LIBRARY"] }, "applicationMain": { "id": "http://jsonschema.net/applicationMain", "type": "string" }, "applicationName": { "id": "http://jsonschema.net/applicationName", "type": "string" }, "applicationConf": { "id": "http://jsonschema.net/processors/11/1/applicationConf", "type": "object", "properties": { "_config": { "id": "http://jsonschema.net/applicationConf/_config", "type": "object", "properties": {} } }, "processors": { "id": "http://jsonschema.net/processors", "type": "array", "items": { "id": "http://jsonschema.net/processors/11", "type": "array", "items": { "id": "http://jsonschema.net/processors/11/1", "type": "object", "properties": { "id": { "id": "http://jsonschema.net/processors/11/1/id", "type": "integer" }, "taskClass": { "id": "http://jsonschema.net/processors/11/1/taskClass", "type": "string" }, "parallelism": { "id": "http://jsonschema.net/processors/11/1/parallelism", "type": "integer" }, "description": { "id": "http://jsonschema.net/processors/11/1/description", "type": "string" }, "taskConf": { "id": "http://jsonschema.net/processors/11/1/taskConf", "type": "object", "properties": { "_config": { "id": "http://jsonschema.net/processors/11/1/taskConf/_config", "type": "object", "properties": {} } } }, "life": { "id": "http://jsonschema.net/processors/11/1/life", "type": "object", "properties": { "birth": { "id": "http://jsonschema.net/processors/11/1/life/birth", "type": "string" }, "death": { "id": "http://jsonschema.net/processors/11/1/life/death", "type": "string" } } }, "executors": { "id": "http://jsonschema.net/processors/11/1/executors", "type": "array", "items": { "id": "http://jsonschema.net/processors/11/1/executors/0", "type": "integer" } }, "taskCount": { "id": "http://jsonschema.net/processors/11/1/taskCount", "type": "array", "items": { "id": "http://jsonschema.net/processors/11/1/taskCount/0", "type": "array", "items": { "id": "http://jsonschema.net/processors/11/1/taskCount/0/1", "type": "object", "properties": { "count": { "id": "http://jsonschema.net/processors/11/1/taskCount/0/1/count", "type": "integer" } } } } } } } } }, "dag": { "id": "http://jsonschema.net/dag", "type": "object", "properties": { "vertexList": { "id": "http://jsonschema.net/dag/vertexList", "type": "array", "items": { "id": "http://jsonschema.net/dag/vertexList/11", "type": "integer" } }, "edgeList": { "id": "http://jsonschema.net/dag/edgeList", "type": "array", "items": { "id": "http://jsonschema.net/dag/edgeList/14", "type": "array", "items": { "id": "http://jsonschema.net/dag/edgeList/14/2", "type": "integer" } } } } }, "user": { "id": "http://jsonschema.net/user", "type": "string" } }, "required": [ "manifestVersion", "manifestType", "applicationName", "processors", "dag", "user" ] }
TODO
- describe each schema entry
- provide sample output
- scrub schema and validate
- detail how the DAG editor needs to generate or update the gearpump.json file.
- describe how dependencies are included and how this can reduce jar size
ISSUES
- how to attach to an existing DAG using this manifest - maybe define an optional applicationId
- the scope here is quite large and may need to be broken into a number of design tasks or reference them
Relevance
> - #1450
> - #1437
Related
- application specific types like TAP/ATK and types of tasks within an application category
- akka-streams RunnableGraph which has no concept of serde and cannot be persisted or distributed